This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c1f22af6f9807fd630aa1af818fd83339d5673a1
Author: haze518 <[email protected]>
AuthorDate: Thu May 15 09:45:15 2025 +0600

    rename batch_size to batch_length
---
 README.md                                          |  4 +--
 core/examples/src/multi-tenant/consumer/main.rs    |  4 +--
 core/examples/src/multi-tenant/producer/main.rs    | 14 +++++-----
 core/examples/src/new-sdk/consumer/main.rs         |  2 +-
 core/examples/src/new-sdk/producer/main.rs         |  2 +-
 .../stream-builder/stream-consumer-config/main.rs  |  2 +-
 .../stream-builder/stream-producer-config/main.rs  |  2 +-
 .../tests/server/scenarios/system_scenario.rs      | 10 +++----
 core/integration/tests/streaming/get_by_offset.rs  | 14 +++++-----
 .../tests/streaming/get_by_timestamp.rs            | 14 +++++-----
 core/sdk/src/clients/consumer.rs                   |  8 +++---
 core/sdk/src/clients/consumer_builder.rs           | 13 +++++----
 core/sdk/src/clients/producer.rs                   | 16 +++++------
 core/sdk/src/clients/producer_builder.rs           | 18 ++++++------
 .../stream_builder/build/build_iggy_consumer.rs    |  4 +--
 .../stream_builder/build/build_iggy_producer.rs    |  4 +--
 .../stream_builder/config/config_iggy_consumer.rs  | 32 +++++++++++-----------
 .../stream_builder/config/config_iggy_producer.rs  | 32 +++++++++++-----------
 .../stream_builder/config/config_iggy_stream.rs    | 20 +++++++-------
 19 files changed, 109 insertions(+), 106 deletions(-)

diff --git a/README.md b/README.md
index 899a05d1..ac89de64 100644
--- a/README.md
+++ b/README.md
@@ -248,7 +248,7 @@ let client = 
IggyClient::from_connection_string("iggy://user:secret@localhost:80
 // Create a producer for the given stream and one of its topics
 let mut producer = client
     .producer("dev01", "events")?
-    .batch_size(1000)
+    .batch_length(1000)
     .send_interval(IggyDuration::from_str("1ms")?)
     .partitioning(Partitioning::balanced())
     .build();
@@ -270,7 +270,7 @@ let mut consumer = client
     .auto_join_consumer_group()
     .polling_strategy(PollingStrategy::next())
     .poll_interval(IggyDuration::from_str("1ms")?)
-    .batch_size(1000)
+    .batch_length(1000)
     .build();
 
 consumer.init().await?;
diff --git a/core/examples/src/multi-tenant/consumer/main.rs 
b/core/examples/src/multi-tenant/consumer/main.rs
index 81b0c3eb..af947f60 100644
--- a/core/examples/src/multi-tenant/consumer/main.rs
+++ b/core/examples/src/multi-tenant/consumer/main.rs
@@ -276,7 +276,7 @@ async fn create_consumers(
     consumers_count: u32,
     stream: &str,
     topics: &[&str],
-    batch_size: u32,
+    batch_length: u32,
     interval: &str,
 ) -> Result<Vec<TenantConsumer>, IggyError> {
     let mut consumers = Vec::new();
@@ -284,7 +284,7 @@ async fn create_consumers(
         for id in 1..=consumers_count {
             let mut consumer = client
                 .consumer_group(CONSUMER_GROUP, stream, topic)?
-                .batch_size(batch_size)
+                .batch_length(batch_length)
                 
.poll_interval(IggyDuration::from_str(interval).expect("Invalid duration"))
                 .polling_strategy(PollingStrategy::next())
                 .auto_join_consumer_group()
diff --git a/core/examples/src/multi-tenant/producer/main.rs 
b/core/examples/src/multi-tenant/producer/main.rs
index b9683c64..2df9daa8 100644
--- a/core/examples/src/multi-tenant/producer/main.rs
+++ b/core/examples/src/multi-tenant/producer/main.rs
@@ -189,7 +189,7 @@ fn start_producers(
     tenant_id: u32,
     producers: Vec<TenantProducer>,
     batches_count: u64,
-    batch_size: u32,
+    batch_length: u32,
 ) -> Vec<JoinHandle<()>> {
     let mut tasks = Vec::new();
     let topics_count = producers
@@ -221,8 +221,8 @@ fn start_producers(
                     _ => panic!("Invalid topic"),
                 };
 
-                let mut messages = Vec::with_capacity(batch_size as usize);
-                for _ in 1..=batch_size {
+                let mut messages = Vec::with_capacity(batch_length as usize);
+                for _ in 1..=batch_length {
                     let payload = 
format!("{message}-{producer_id}-{message_id}");
                     let message = 
IggyMessage::from_str(&payload).expect("Invalid message");
                     messages.push(message);
@@ -230,7 +230,7 @@ fn start_producers(
 
                 if let Err(error) = producer.producer.send(messages).await {
                     error!(
-                        "Failed to send: {batch_size} message(s) to: {} -> {} 
by tenant: {tenant_id}, producer: {producer_id} with error: {error}",
+                        "Failed to send: {batch_length} message(s) to: {} -> 
{} by tenant: {tenant_id}, producer: {producer_id} with error: {error}",
                         producer.stream, producer.topic,
                     );
                     continue;
@@ -238,7 +238,7 @@ fn start_producers(
 
                 counter += 1;
                 info!(
-                    "Sent: {batch_size} message(s) by tenant: {tenant_id}, 
producer: {producer_id}, to: {} -> {}",
+                    "Sent: {batch_length} message(s) by tenant: {tenant_id}, 
producer: {producer_id}, to: {} -> {}",
                     producer.stream, producer.topic
                 );
             }
@@ -254,7 +254,7 @@ async fn create_producers(
     partitions_count: u32,
     stream: &str,
     topics: &[&str],
-    batch_size: u32,
+    batch_length: u32,
     interval: &str,
 ) -> Result<Vec<TenantProducer>, IggyError> {
     let mut producers = Vec::new();
@@ -262,7 +262,7 @@ async fn create_producers(
         for id in 1..=producers_count {
             let mut producer = client
                 .producer(stream, topic)?
-                .batch_size(batch_size)
+                .batch_length(batch_length)
                 
.send_interval(IggyDuration::from_str(interval).expect("Invalid duration"))
                 .partitioning(Partitioning::balanced())
                 .create_topic_if_not_exists(
diff --git a/core/examples/src/new-sdk/consumer/main.rs 
b/core/examples/src/new-sdk/consumer/main.rs
index cd20b9c7..aec1d464 100644
--- a/core/examples/src/new-sdk/consumer/main.rs
+++ b/core/examples/src/new-sdk/consumer/main.rs
@@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
     .auto_join_consumer_group()
     .polling_strategy(PollingStrategy::next())
     .poll_interval(IggyDuration::from_str(&args.interval)?)
-    .batch_size(args.messages_per_batch)
+    .batch_length(args.messages_per_batch)
     .build();
 
     consumer.init().await?;
diff --git a/core/examples/src/new-sdk/producer/main.rs 
b/core/examples/src/new-sdk/producer/main.rs
index b909921c..8e3fd899 100644
--- a/core/examples/src/new-sdk/producer/main.rs
+++ b/core/examples/src/new-sdk/producer/main.rs
@@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
     client.connect().await?;
     let mut producer = client
         .producer(&args.stream_id, &args.topic_id)?
-        .batch_size(args.messages_per_batch)
+        .batch_length(args.messages_per_batch)
         .send_interval(IggyDuration::from_str(&args.interval)?)
         .partitioning(Partitioning::balanced())
         .create_topic_if_not_exists(
diff --git a/core/examples/src/stream-builder/stream-consumer-config/main.rs 
b/core/examples/src/stream-builder/stream-consumer-config/main.rs
index 8006b0ce..84e86c65 100644
--- a/core/examples/src/stream-builder/stream-consumer-config/main.rs
+++ b/core/examples/src/stream-builder/stream-consumer-config/main.rs
@@ -48,7 +48,7 @@ async fn main() -> Result<(), IggyError> {
         // The max number of messages to send in a batch. The greater the 
batch size, the higher the throughput for bulk data.
         // Note, there is a tradeoff between batch size and latency, so you 
want to benchmark your setup.
         // Note, this only applies to batch send messages. Single messages are 
sent immediately.
-        .batch_size(100)
+        .batch_length(100)
         // Create the stream if it doesn't exist.
         .create_stream_if_not_exists(true)
         // Create the topic if it doesn't exist.
diff --git a/core/examples/src/stream-builder/stream-producer-config/main.rs 
b/core/examples/src/stream-builder/stream-producer-config/main.rs
index 5dd6e481..f501a477 100644
--- a/core/examples/src/stream-builder/stream-producer-config/main.rs
+++ b/core/examples/src/stream-builder/stream-producer-config/main.rs
@@ -44,7 +44,7 @@ async fn main() -> Result<(), IggyError> {
         // The max number of messages to send in a batch. The greater the 
batch size, the higher the throughput for bulk data.
         // Note, there is a tradeoff between batch size and latency, so you 
want to benchmark your setup.
         // Note, this only applies to batch send messages. Single messages are 
sent immediately.
-        .batch_size(100)
+        .batch_length(100)
         // Sets the interval between sending the messages. Affects latency so 
you want to benchmark this value.
         .send_interval(IggyDuration::from_str("5ms").unwrap())
         // `Partitioning` is used to specify to which partition the messages 
should be sent.
diff --git a/core/integration/tests/server/scenarios/system_scenario.rs 
b/core/integration/tests/server/scenarios/system_scenario.rs
index 1ae40637..c70968c8 100644
--- a/core/integration/tests/server/scenarios/system_scenario.rs
+++ b/core/integration/tests/server/scenarios/system_scenario.rs
@@ -255,9 +255,9 @@ pub async fn run(client_factory: &dyn ClientFactory) {
 
     // 19. Messages should be also polled in the smaller batches
     let batches_count = 10;
-    let batch_size = MESSAGES_COUNT / batches_count;
+    let batch_length = MESSAGES_COUNT / batches_count;
     for i in 0..batches_count {
-        let start_offset = (i * batch_size) as u64;
+        let start_offset = (i * batch_length) as u64;
         let polled_messages = client
             .poll_messages(
                 &Identifier::numeric(STREAM_ID).unwrap(),
@@ -265,13 +265,13 @@ pub async fn run(client_factory: &dyn ClientFactory) {
                 Some(PARTITION_ID),
                 &consumer,
                 &PollingStrategy::offset(start_offset),
-                batch_size,
+                batch_length,
                 false,
             )
             .await
             .unwrap();
-        assert_eq!(polled_messages.messages.len() as u32, batch_size);
-        for i in 0..batch_size as u64 {
+        assert_eq!(polled_messages.messages.len() as u32, batch_length);
+        for i in 0..batch_length as u64 {
             let offset = start_offset + i;
             let message = polled_messages.messages.get(i as usize).unwrap();
             assert_message(message, offset);
diff --git a/core/integration/tests/streaming/get_by_offset.rs 
b/core/integration/tests/streaming/get_by_offset.rs
index 5652391f..28320d5e 100644
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ b/core/integration/tests/streaming/get_by_offset.rs
@@ -82,14 +82,14 @@ fn very_large_batches() -> Vec<u32> {
 #[tokio::test]
 async fn test_get_messages_by_offset(
     message_size: IggyByteSize,
-    batch_sizes: Vec<u32>,
+    batch_lengths: Vec<u32>,
     messages_required_to_save: u32,
     segment_size: IggyByteSize,
     cache_indexes: CacheIndexesConfig,
 ) {
     println!(
         "Running test with message_size: {}, batches: {:?}, 
messages_required_to_save: {}, segment_size: {}, cache_indexes: {}",
-        message_size, batch_sizes, messages_required_to_save, segment_size, 
cache_indexes
+        message_size, batch_lengths, messages_required_to_save, segment_size, 
cache_indexes
     );
 
     let setup = TestSetup::init().await;
@@ -97,7 +97,7 @@ async fn test_get_messages_by_offset(
     let topic_id = 1;
     let partition_id = 1;
 
-    let total_messages_count = batch_sizes.iter().sum();
+    let total_messages_count = batch_lengths.iter().sum();
 
     let config = Arc::new(SystemConfig {
         path: setup.config.path.to_string(),
@@ -169,11 +169,11 @@ async fn test_get_messages_by_offset(
     }
 
     // Keep track of offsets after each batch
-    let mut batch_offsets = Vec::with_capacity(batch_sizes.len());
+    let mut batch_offsets = Vec::with_capacity(batch_lengths.len());
     let mut current_pos = 0;
 
     // Append all batches as defined in the test matrix
-    for (batch_idx, &batch_len) in batch_sizes.iter().enumerate() {
+    for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() {
         // If we've generated too many messages, skip the rest
         if current_pos + batch_len as usize > all_messages.len() {
             break;
@@ -182,7 +182,7 @@ async fn test_get_messages_by_offset(
         println!(
             "Appending batch {}/{} with {} messages",
             batch_idx + 1,
-            batch_sizes.len(),
+            batch_lengths.len(),
             batch_len
         );
 
@@ -221,7 +221,7 @@ async fn test_get_messages_by_offset(
     // Test 2: Get messages from middle (after 3rd batch)
     if batch_offsets.len() >= 3 {
         let middle_offset = batch_offsets[2];
-        let prior_batches_sum: u32 = batch_sizes[..3].iter().sum();
+        let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
         let remaining_messages = total_sent_messages - prior_batches_sum;
 
         let middle_messages = partition
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
index d68e4811..c5b9bf01 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -84,14 +84,14 @@ fn very_large_batches() -> Vec<u32> {
 #[tokio::test]
 async fn test_get_messages_by_timestamp(
     message_size: IggyByteSize,
-    batch_sizes: Vec<u32>,
+    batch_lentghs: Vec<u32>,
     messages_required_to_save: u32,
     segment_size: IggyByteSize,
     cache_indexes: CacheIndexesConfig,
 ) {
     println!(
         "Running test with message_size: {}, batches: {:?}, 
messages_required_to_save: {}, segment_size: {}, cache_indexes: {}",
-        message_size, batch_sizes, messages_required_to_save, segment_size, 
cache_indexes
+        message_size, batch_lentghs, messages_required_to_save, segment_size, 
cache_indexes
     );
 
     let setup = TestSetup::init().await;
@@ -99,7 +99,7 @@ async fn test_get_messages_by_timestamp(
     let topic_id = 1;
     let partition_id = 1;
 
-    let total_messages_count = batch_sizes.iter().sum();
+    let total_messages_count = batch_lentghs.iter().sum();
 
     let config = Arc::new(SystemConfig {
         path: setup.config.path.to_string(),
@@ -173,11 +173,11 @@ async fn test_get_messages_by_timestamp(
 
     // Timestamp tracking for messages
     let initial_timestamp = IggyTimestamp::now();
-    let mut batch_timestamps = Vec::with_capacity(batch_sizes.len());
+    let mut batch_timestamps = Vec::with_capacity(batch_lentghs.len());
     let mut current_pos = 0;
 
     // Append all batches as defined in the test matrix with separate 
timestamps
-    for (batch_idx, &batch_len) in batch_sizes.iter().enumerate() {
+    for (batch_idx, &batch_len) in batch_lentghs.iter().enumerate() {
         // Add a small delay between batches to ensure distinct timestamps
         sleep(std::time::Duration::from_millis(2));
 
@@ -189,7 +189,7 @@ async fn test_get_messages_by_timestamp(
         println!(
             "Appending batch {}/{} with {} messages",
             batch_idx + 1,
-            batch_sizes.len(),
+            batch_lentghs.len(),
             batch_len
         );
 
@@ -241,7 +241,7 @@ async fn test_get_messages_by_timestamp(
         let middle_timestamp = 
IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000);
 
         // Calculate how many messages should be in batches after the 3rd
-        let prior_batches_sum: u32 = batch_sizes[..3].iter().sum();
+        let prior_batches_sum: u32 = batch_lentghs[..3].iter().sum();
         let remaining_messages = total_sent_messages - prior_batches_sum;
 
         let middle_messages = partition
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 795f4edc..3b5f2d17 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -103,7 +103,7 @@ pub struct IggyConsumer {
     partition_id: Option<u32>,
     polling_strategy: PollingStrategy,
     poll_interval_micros: u64,
-    batch_size: u32,
+    batch_length: u32,
     auto_commit: AutoCommit,
     auto_commit_after_polling: bool,
     auto_join_consumer_group: bool,
@@ -137,7 +137,7 @@ impl IggyConsumer {
         partition_id: Option<u32>,
         polling_interval: Option<IggyDuration>,
         polling_strategy: PollingStrategy,
-        batch_size: u32,
+        batch_length: u32,
         auto_commit: AutoCommit,
         auto_join_consumer_group: bool,
         create_consumer_group_if_not_exists: bool,
@@ -165,7 +165,7 @@ impl IggyConsumer {
             last_consumed_offsets: Arc::new(DashMap::new()),
             current_offsets: Arc::new(DashMap::new()),
             poll_future: None,
-            batch_size,
+            batch_length,
             auto_commit,
             auto_commit_after_polling: matches!(
                 auto_commit,
@@ -609,7 +609,7 @@ impl IggyConsumer {
         let consumer = self.consumer.clone();
         let polling_strategy = self.polling_strategy;
         let client = self.client.clone();
-        let count = self.batch_size;
+        let count = self.batch_length;
         let auto_commit_after_polling = self.auto_commit_after_polling;
         let auto_commit_enabled = self.auto_commit != AutoCommit::Disabled;
         let interval = self.poll_interval_micros;
diff --git a/core/sdk/src/clients/consumer_builder.rs 
b/core/sdk/src/clients/consumer_builder.rs
index 5008270f..908a367c 100644
--- a/core/sdk/src/clients/consumer_builder.rs
+++ b/core/sdk/src/clients/consumer_builder.rs
@@ -32,7 +32,7 @@ pub struct IggyConsumerBuilder {
     partition: Option<u32>,
     polling_strategy: PollingStrategy,
     polling_interval: Option<IggyDuration>,
-    batch_size: u32,
+    batch_length: u32,
     auto_commit: AutoCommit,
     auto_join_consumer_group: bool,
     create_consumer_group_if_not_exists: bool,
@@ -63,7 +63,7 @@ impl IggyConsumerBuilder {
             topic: topic_id,
             partition: partition_id,
             polling_strategy: PollingStrategy::next(),
-            batch_size: 1000,
+            batch_length: 1000,
             auto_commit: AutoCommit::IntervalOrWhen(
                 IggyDuration::ONE_SECOND,
                 AutoCommitWhen::PollingMessages,
@@ -103,8 +103,11 @@ impl IggyConsumerBuilder {
     }
 
     /// Sets the batch size for polling messages.
-    pub fn batch_size(self, batch_size: u32) -> Self {
-        Self { batch_size, ..self }
+    pub fn batch_length(self, batch_length: u32) -> Self {
+        Self {
+            batch_length,
+            ..self
+        }
     }
 
     /// Sets the auto-commit configuration for storing the offset on the 
server.
@@ -226,7 +229,7 @@ impl IggyConsumerBuilder {
             self.partition,
             self.polling_interval,
             self.polling_strategy,
-            self.batch_size,
+            self.batch_length,
             self.auto_commit,
             self.auto_join_consumer_group,
             self.create_consumer_group_if_not_exists,
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index 7e9443c7..c9c203bc 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -42,7 +42,7 @@ pub struct IggyProducer {
     stream_name: String,
     topic_id: Arc<Identifier>,
     topic_name: String,
-    batch_size: Option<usize>,
+    batch_length: Option<usize>,
     partitioning: Option<Arc<Partitioning>>,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
@@ -68,7 +68,7 @@ impl IggyProducer {
         stream_name: String,
         topic: Identifier,
         topic_name: String,
-        batch_size: Option<usize>,
+        batch_length: Option<usize>,
         partitioning: Option<Partitioning>,
         encryptor: Option<Arc<EncryptorKind>>,
         partitioner: Option<Arc<dyn Partitioner>>,
@@ -90,7 +90,7 @@ impl IggyProducer {
             stream_name,
             topic_id: Arc::new(topic),
             topic_name,
-            batch_size,
+            batch_length,
             partitioning: partitioning.map(Arc::new),
             encryptor,
             partitioner,
@@ -302,8 +302,8 @@ impl IggyProducer {
     ) -> Result<(), IggyError> {
         self.encrypt_messages(&mut messages)?;
         let partitioning = self.get_partitioning(&stream, &topic, &messages, 
partitioning)?;
-        let batch_size = self.batch_size.unwrap_or(MAX_BATCH_SIZE);
-        let batches = messages.chunks_mut(batch_size);
+        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
+        let batches = messages.chunks_mut(batch_length);
         let mut current_batch = 1;
         let batches_count = batches.len();
         for batch in batches {
@@ -339,8 +339,8 @@ impl IggyProducer {
         trace!("No batch size specified, sending messages immediately.");
         self.encrypt_messages(&mut messages)?;
         let partitioning = self.get_partitioning(stream, topic, &messages, 
partitioning)?;
-        let batch_size = self.batch_size.unwrap_or(MAX_BATCH_SIZE);
-        if messages.len() <= batch_size {
+        let batch_length = self.batch_length.unwrap_or(MAX_BATCH_SIZE);
+        if messages.len() <= batch_length {
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
             self.try_send_messages(stream, topic, &partitioning, &mut messages)
@@ -348,7 +348,7 @@ impl IggyProducer {
             return Ok(());
         }
 
-        for batch in messages.chunks_mut(batch_size) {
+        for batch in messages.chunks_mut(batch_length) {
             self.last_sent_at
                 .store(IggyTimestamp::now().into(), ORDERING);
             self.try_send_messages(stream, topic, &partitioning, batch)
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index a25459cd..66ba3c08 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -31,7 +31,7 @@ pub struct IggyProducerBuilder {
     stream_name: String,
     topic: Identifier,
     topic_name: String,
-    batch_size: Option<usize>,
+    batch_length: Option<usize>,
     partitioning: Option<Partitioning>,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
@@ -63,7 +63,7 @@ impl IggyProducerBuilder {
             stream_name,
             topic,
             topic_name,
-            batch_size: Some(1000),
+            batch_length: Some(1000),
             partitioning: None,
             encryptor,
             partitioner,
@@ -90,26 +90,26 @@ impl IggyProducerBuilder {
     }
 
     /// Sets the number of messages to batch before sending them, can be 
combined with `interval`.
-    pub fn batch_size(self, batch_size: u32) -> Self {
+    pub fn batch_length(self, batch_length: u32) -> Self {
         Self {
-            batch_size: if batch_size == 0 {
+            batch_length: if batch_length == 0 {
                 None
             } else {
-                Some(batch_size.min(MAX_BATCH_SIZE as u32) as usize)
+                Some(batch_length.min(MAX_BATCH_SIZE as u32) as usize)
             },
             ..self
         }
     }
 
     /// Clears the batch size.
-    pub fn without_batch_size(self) -> Self {
+    pub fn without_batch_length(self) -> Self {
         Self {
-            batch_size: None,
+            batch_length: None,
             ..self
         }
     }
 
-    /// Sets the interval between sending the messages, can be combined with 
`batch_size`.
+    /// Sets the interval between sending the messages, can be combined with 
`batch_length`.
     pub fn send_interval(self, interval: IggyDuration) -> Self {
         Self {
             send_interval: Some(interval),
@@ -236,7 +236,7 @@ impl IggyProducerBuilder {
             self.stream_name,
             self.topic,
             self.topic_name,
-            self.batch_size,
+            self.batch_length,
             self.partitioning,
             self.encryptor,
             self.partitioner,
diff --git a/core/sdk/src/stream_builder/build/build_iggy_consumer.rs 
b/core/sdk/src/stream_builder/build/build_iggy_consumer.rs
index b3d79ab4..dd7260bd 100644
--- a/core/sdk/src/stream_builder/build/build_iggy_consumer.rs
+++ b/core/sdk/src/stream_builder/build/build_iggy_consumer.rs
@@ -48,7 +48,7 @@ pub(crate) async fn build_iggy_consumer(
     let auto_commit = config.auto_commit();
     let consumer_kind = config.consumer_kind();
     let consumer_name = config.consumer_name();
-    let batch_size = config.batch_size();
+    let batch_length = config.batch_length();
     let polling_interval = config.polling_interval();
     let polling_strategy = config.polling_strategy();
     let partition = config.partitions_count();
@@ -64,7 +64,7 @@ pub(crate) async fn build_iggy_consumer(
     .auto_join_consumer_group()
     .polling_strategy(polling_strategy)
     .poll_interval(polling_interval)
-    .batch_size(batch_size)
+    .batch_length(batch_length)
     .polling_retry_interval(polling_retry_interval);
 
     if let Some(encryptor) = config.encryptor() {
diff --git a/core/sdk/src/stream_builder/build/build_iggy_producer.rs 
b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
index 6b7f2d18..e8f5a13f 100644
--- a/core/sdk/src/stream_builder/build/build_iggy_producer.rs
+++ b/core/sdk/src/stream_builder/build/build_iggy_producer.rs
@@ -47,7 +47,7 @@ pub(crate) async fn build_iggy_producer(
     let topic = config.topic_name();
     let topic_partitions_count = config.topic_partitions_count();
     let topic_replication_factor = config.topic_replication_factor();
-    let batch_size = config.batch_size();
+    let batch_length = config.batch_length();
     let send_interval = config.send_interval();
     let partitioning = config.partitioning().to_owned();
     let send_retries = config.send_retries_count();
@@ -56,7 +56,7 @@ pub(crate) async fn build_iggy_producer(
     trace!("Build iggy producer");
     let mut builder = client
         .producer(stream, topic)?
-        .batch_size(batch_size)
+        .batch_length(batch_length)
         .send_interval(send_interval)
         .partitioning(partitioning)
         .create_stream_if_not_exists()
diff --git a/core/sdk/src/stream_builder/config/config_iggy_consumer.rs 
b/core/sdk/src/stream_builder/config/config_iggy_consumer.rs
index e7496384..4a5ce32c 100644
--- a/core/sdk/src/stream_builder/config/config_iggy_consumer.rs
+++ b/core/sdk/src/stream_builder/config/config_iggy_consumer.rs
@@ -37,9 +37,9 @@ pub struct IggyConsumerConfig {
     topic_name: String,
     /// The auto-commit configuration for storing the message offset on the 
server. See  `AutoCommit` for details.
     auto_commit: AutoCommit,
-    /// The max number of messages to send in a batch. The greater the batch 
size, the higher the throughput for bulk data.
+    /// The max number of messages to send in a batch. The greater the batch 
length, the higher the throughput for bulk data.
     /// Note, there is a tradeoff between batch size and latency, so you want 
to benchmark your setup.
-    batch_size: u32,
+    batch_length: u32,
     /// Create the stream if it doesn't exist.
     create_stream_if_not_exists: bool,
     /// Create the topic if it doesn't exist.
@@ -78,7 +78,7 @@ impl Default for IggyConsumerConfig {
             topic_id,
             topic_name: "test_topic".to_string(),
             auto_commit: AutoCommit::When(AutoCommitWhen::PollingMessages),
-            batch_size: 100,
+            batch_length: 100,
             create_stream_if_not_exists: false,
             create_topic_if_not_exists: false,
             consumer_name: "test_consumer".to_string(),
@@ -105,7 +105,7 @@ impl IggyConsumerConfig {
     /// * `topic_id` - The topic id.
     /// * `topic_name` - The topic name.
     /// * `auto_commit` - The auto commit config.
-    /// * `batch_size` - The max number of messages to send in a batch.
+    /// * `batch_length` - The max number of messages to send in a batch.
     /// * `create_stream_if_not_exists` - Whether to create the stream if it 
does not exists.
     /// * `create_topic_if_not_exists` - Whether to create the topic if it 
does not exists.
     /// * `consumer_name` - The consumer name.
@@ -130,7 +130,7 @@ impl IggyConsumerConfig {
         topic_id: Identifier,
         topic_name: String,
         auto_commit: AutoCommit,
-        batch_size: u32,
+        batch_length: u32,
         create_stream_if_not_exists: bool,
         create_topic_if_not_exists: bool,
         consumer_name: String,
@@ -150,7 +150,7 @@ impl IggyConsumerConfig {
             topic_id,
             topic_name,
             auto_commit,
-            batch_size,
+            batch_length,
             create_stream_if_not_exists,
             create_topic_if_not_exists,
             consumer_name,
@@ -172,7 +172,7 @@ impl IggyConsumerConfig {
     ///
     /// * `stream` - The stream name.
     /// * `topic` - The topic name.
-    /// * `batch_size` - The max number of messages to send in a batch.
+    /// * `batch_length` - The max number of messages to send in a batch.
     /// * `polling_interval` - The interval between polling for new messages.
     ///
     /// Returns:
@@ -181,7 +181,7 @@ impl IggyConsumerConfig {
     pub fn from_stream_topic(
         stream: &str,
         topic: &str,
-        batch_size: u32,
+        batch_length: u32,
         polling_interval: IggyDuration,
     ) -> Result<Self, IggyError> {
         let stream_id = Identifier::from_str_value(stream)?;
@@ -193,7 +193,7 @@ impl IggyConsumerConfig {
             topic_id,
             topic_name: topic.to_string(),
             auto_commit: AutoCommit::When(AutoCommitWhen::PollingMessages),
-            batch_size,
+            batch_length,
             create_stream_if_not_exists: false,
             create_topic_if_not_exists: false,
             consumer_name: format!("consumer-{}-{}", stream, topic),
@@ -231,8 +231,8 @@ impl IggyConsumerConfig {
         self.auto_commit
     }
 
-    pub fn batch_size(&self) -> u32 {
-        self.batch_size
+    pub fn batch_length(&self) -> u32 {
+        self.batch_length
     }
     pub fn create_stream_if_not_exists(&self) -> bool {
         self.create_stream_if_not_exists
@@ -299,7 +299,7 @@ mod tests {
             .topic_id(topic_id)
             .topic_name("test_topic".to_string())
             .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
-            .batch_size(100)
+            .batch_length(100)
             .create_stream_if_not_exists(true)
             .create_topic_if_not_exists(true)
             .consumer_name("test_consumer".to_string())
@@ -326,7 +326,7 @@ mod tests {
             config.auto_commit(),
             AutoCommit::When(AutoCommitWhen::PollingMessages)
         );
-        assert_eq!(config.batch_size(), 100);
+        assert_eq!(config.batch_length(), 100);
         assert!(config.create_stream_if_not_exists());
         assert!(config.create_topic_if_not_exists());
         assert_eq!(config.consumer_name(), "test_consumer");
@@ -361,7 +361,7 @@ mod tests {
             config.auto_commit(),
             AutoCommit::When(AutoCommitWhen::PollingMessages)
         );
-        assert_eq!(config.batch_size(), 100);
+        assert_eq!(config.batch_length(), 100);
         assert!(!config.create_stream_if_not_exists());
         assert!(!config.create_topic_if_not_exists());
         assert_eq!(config.consumer_name(), "test_consumer");
@@ -415,7 +415,7 @@ mod tests {
             config.auto_commit(),
             AutoCommit::When(AutoCommitWhen::PollingMessages)
         );
-        assert_eq!(config.batch_size(), 100);
+        assert_eq!(config.batch_length(), 100);
         assert!(!config.create_stream_if_not_exists());
         assert!(!config.create_topic_if_not_exists());
         assert_eq!(config.consumer_name(), "test_consumer");
@@ -449,7 +449,7 @@ mod tests {
 
         assert_eq!(config.stream_name(), "test_stream");
         assert_eq!(config.topic_name(), "test_topic");
-        assert_eq!(config.batch_size(), 100);
+        assert_eq!(config.batch_length(), 100);
         assert!(!config.create_stream_if_not_exists());
         assert!(!config.create_topic_if_not_exists());
         assert_eq!(config.consumer_name(), "consumer-test_stream-test_topic");
diff --git a/core/sdk/src/stream_builder/config/config_iggy_producer.rs 
b/core/sdk/src/stream_builder/config/config_iggy_producer.rs
index 1ca80244..0e5c4f76 100644
--- a/core/sdk/src/stream_builder/config/config_iggy_producer.rs
+++ b/core/sdk/src/stream_builder/config/config_iggy_producer.rs
@@ -37,8 +37,8 @@ pub struct IggyProducerConfig {
     /// Set the topic replication factor
     topic_replication_factor: Option<u8>,
     /// The max number of messages to send in a batch. Must be greater than 0.
-    batch_size: u32,
-    /// Sets the interval between sending the messages, can be combined with 
`batch_size`.
+    batch_length: u32,
+    /// Sets the interval between sending the messages, can be combined with 
`batch_length`.
     send_interval: IggyDuration,
     /// Specifies to which partition the messages should be sent.
     partitioning: Partitioning,
@@ -61,7 +61,7 @@ impl Default for IggyProducerConfig {
             stream_name: "test_stream".to_string(),
             topic_id,
             topic_name: "test_topic".to_string(),
-            batch_size: 100,
+            batch_length: 100,
             send_interval: IggyDuration::from_str("5ms").unwrap(),
             partitioning: Partitioning::balanced(),
             topic_partitions_count: 1,
@@ -84,7 +84,7 @@ impl IggyProducerConfig {
     /// * `topic_name` - The topic name.
     /// * `topic_partitions_count` - The number of partitions to create.
     /// * `topic_replication_factor` - The replication factor to use.
-    /// * `batch_size` - The max number of messages to send in a batch.
+    /// * `batch_length` - The max number of messages to send in a batch.
     /// * `send_interval` - The interval between messages sent.
     /// * `partitioning` - The partitioning strategy to use.
     /// * `encryptor` - The encryptor to use.
@@ -102,7 +102,7 @@ impl IggyProducerConfig {
         topic_name: String,
         topic_partitions_count: u32,
         topic_replication_factor: Option<u8>,
-        batch_size: u32,
+        batch_length: u32,
         send_interval: IggyDuration,
         partitioning: Partitioning,
         encryptor: Option<Arc<EncryptorKind>>,
@@ -116,7 +116,7 @@ impl IggyProducerConfig {
             topic_name,
             topic_partitions_count,
             topic_replication_factor,
-            batch_size,
+            batch_length,
             send_interval,
             partitioning,
             encryptor,
@@ -132,7 +132,7 @@ impl IggyProducerConfig {
     ///
     /// * `stream` - The stream name.
     /// * `topic` - The topic name.
-    /// * `batch_size` - The max number of messages to send in a batch.
+    /// * `batch_length` - The max number of messages to send in a batch.
     /// * `send_interval` - The interval between messages sent.
     ///
     /// Returns:
@@ -141,7 +141,7 @@ impl IggyProducerConfig {
     pub fn from_stream_topic(
         stream: &str,
         topic: &str,
-        batch_size: u32,
+        batch_length: u32,
         send_interval: IggyDuration,
     ) -> Result<Self, IggyError> {
         let stream_id = Identifier::from_str_value(stream)?;
@@ -152,7 +152,7 @@ impl IggyProducerConfig {
             stream_name: stream.to_string(),
             topic_id,
             topic_name: topic.to_string(),
-            batch_size,
+            batch_length,
             send_interval,
             partitioning: Partitioning::balanced(),
             topic_partitions_count: 1,
@@ -181,8 +181,8 @@ impl IggyProducerConfig {
         &self.topic_name
     }
 
-    pub fn batch_size(&self) -> u32 {
-        self.batch_size
+    pub fn batch_length(&self) -> u32 {
+        self.batch_length
     }
 
     pub fn send_interval(&self) -> IggyDuration {
@@ -230,7 +230,7 @@ mod tests {
             .topic_id(Identifier::from_str_value(topic).unwrap())
             .topic_name(topic)
             .topic_partitions_count(3)
-            .batch_size(100)
+            .batch_length(100)
             .send_interval(IggyDuration::from_str("5ms").unwrap())
             .partitioning(Partitioning::balanced())
             .send_retries_count(3)
@@ -247,7 +247,7 @@ mod tests {
             &Identifier::from_str_value("test_topic").unwrap()
         );
         assert_eq!(config.topic_name(), "test_topic");
-        assert_eq!(config.batch_size(), 100);
+        assert_eq!(config.batch_length(), 100);
         assert_eq!(
             config.send_interval(),
             IggyDuration::from_str("5ms").unwrap()
@@ -272,7 +272,7 @@ mod tests {
         assert_eq!(config.stream_name(), "test_stream");
         assert_eq!(config.topic_id(), &topic_id);
         assert_eq!(config.topic_name(), "test_topic");
-        assert_eq!(config.batch_size(), 100);
+        assert_eq!(config.batch_length(), 100);
         assert_eq!(
             config.send_interval(),
             IggyDuration::from_str("5ms").unwrap()
@@ -310,7 +310,7 @@ mod tests {
         assert_eq!(config.stream_name(), "test_stream");
         assert_eq!(config.topic_id(), &topic_id);
         assert_eq!(config.topic_name(), "test_topic");
-        assert_eq!(config.batch_size(), 100);
+        assert_eq!(config.batch_length(), 100);
         assert_eq!(
             config.send_interval(),
             IggyDuration::from_str("5ms").unwrap()
@@ -341,7 +341,7 @@ mod tests {
         assert_eq!(config.stream_name(), "test_stream");
         assert_eq!(config.topic_id(), &topic_id);
         assert_eq!(config.topic_name(), "test_topic");
-        assert_eq!(config.batch_size(), 100);
+        assert_eq!(config.batch_length(), 100);
         assert_eq!(
             config.send_interval(),
             IggyDuration::from_str("5ms").unwrap()
diff --git a/core/sdk/src/stream_builder/config/config_iggy_stream.rs 
b/core/sdk/src/stream_builder/config/config_iggy_stream.rs
index 2015e60b..49bc570e 100644
--- a/core/sdk/src/stream_builder/config/config_iggy_stream.rs
+++ b/core/sdk/src/stream_builder/config/config_iggy_stream.rs
@@ -51,7 +51,7 @@ impl IggyStreamConfig {
     ///
     /// * `stream` - The stream name.
     /// * `topic` - The topic name.
-    /// * `batch_size` - The max number of messages to send in a batch.
+    /// * `batch_length` - The max number of messages to send in a batch.
     /// * `send_interval` - The interval between messages sent.
     /// * `polling_interval` - The interval between polling for new messages.
     ///
@@ -61,15 +61,15 @@ impl IggyStreamConfig {
     pub fn from_stream_topic(
         stream: &str,
         topic: &str,
-        batch_size: u32,
+        batch_length: u32,
         send_interval: IggyDuration,
         polling_interval: IggyDuration,
     ) -> Result<Self, IggyError> {
         let consumer_config =
-            IggyConsumerConfig::from_stream_topic(stream, topic, batch_size, 
polling_interval)?;
+            IggyConsumerConfig::from_stream_topic(stream, topic, batch_length, 
polling_interval)?;
 
         let producer_config =
-            IggyProducerConfig::from_stream_topic(stream, topic, batch_size, 
send_interval)?;
+            IggyProducerConfig::from_stream_topic(stream, topic, batch_length, 
send_interval)?;
 
         Ok(Self {
             consumer_config,
@@ -129,8 +129,8 @@ mod tests {
         let config = IggyStreamConfig::new(consumer_config, producer_config);
         assert_eq!(config.stream_name(), "test_stream");
         assert_eq!(config.topic_name(), "test_topic");
-        assert_eq!(config.consumer_config().batch_size(), 100);
-        assert_eq!(config.producer_config().batch_size(), 100);
+        assert_eq!(config.consumer_config().batch_length(), 100);
+        assert_eq!(config.producer_config().batch_length(), 100);
         assert_eq!(
             config.consumer_config().polling_interval(),
             IggyDuration::from_str("5ms").unwrap()
@@ -146,8 +146,8 @@ mod tests {
         let config = IggyStreamConfig::default();
         assert_eq!(config.stream_name(), "test_stream");
         assert_eq!(config.topic_name(), "test_topic");
-        assert_eq!(config.consumer_config().batch_size(), 100);
-        assert_eq!(config.producer_config().batch_size(), 100);
+        assert_eq!(config.consumer_config().batch_length(), 100);
+        assert_eq!(config.producer_config().batch_length(), 100);
         assert_eq!(
             config.consumer_config().polling_interval(),
             IggyDuration::from_str("5ms").unwrap()
@@ -173,8 +173,8 @@ mod tests {
 
         assert_eq!(config.stream_name(), "test_stream");
         assert_eq!(config.topic_name(), "test_topic");
-        assert_eq!(config.consumer_config().batch_size(), 100);
-        assert_eq!(config.producer_config().batch_size(), 100);
+        assert_eq!(config.consumer_config().batch_length(), 100);
+        assert_eq!(config.producer_config().batch_length(), 100);
         assert_eq!(
             config.consumer_config().polling_interval(),
             IggyDuration::from_str("5ms").unwrap()


Reply via email to