This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 9baaf38d4efb13a84b1267155dcf5380a8580142
Author: haze518 <[email protected]>
AuthorDate: Sat May 17 10:53:21 2025 +0600

    del
---
 core/sdk/src/clients/mod.rs              |  1 +
 core/sdk/src/clients/producer.rs         |  7 +++++++
 core/sdk/src/clients/producer_builder.rs |  3 +++
 core/sdk/src/clients/send_mode.rs        | 27 +++++++++++++++++++++++++++
 4 files changed, 38 insertions(+)

diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs
index be854f1b..fc872904 100644
--- a/core/sdk/src/clients/mod.rs
+++ b/core/sdk/src/clients/mod.rs
@@ -32,6 +32,7 @@ pub mod consumer;
 pub mod consumer_builder;
 pub mod producer;
 pub mod producer_builder;
+pub mod send_mode;
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
 const MAX_BATCH_SIZE: usize = 1000000;
diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs
index f3706e3a..751b624f 100644
--- a/core/sdk/src/clients/producer.rs
+++ b/core/sdk/src/clients/producer.rs
@@ -1,3 +1,4 @@
+use super::send_mode::SendMode;
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,6 +26,7 @@ use iggy_common::{
     CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, 
IggyDuration,
     IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, 
Partitioner, Partitioning,
 };
+use tokio::task::JoinHandle;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicU64};
 use std::time::Duration;
@@ -44,6 +46,7 @@ pub struct IggyProducer {
     topic_name: String,
     batch_length: Option<usize>,
     partitioning: Option<Arc<Partitioning>>,
+    send_mode: SendMode,
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time_micros: u64,
@@ -58,6 +61,8 @@ pub struct IggyProducer {
     last_sent_at: Arc<AtomicU64>,
     send_retries_count: Option<u32>,
     send_retries_interval: Option<IggyDuration>,
+
+    _join_handle: Option<JoinHandle<()>>,
 }
 
 impl IggyProducer {
@@ -92,6 +97,7 @@ impl IggyProducer {
             topic_name,
             batch_length,
             partitioning: partitioning.map(Arc::new),
+            send_mode: SendMode::default(),
             encryptor,
             partitioner,
             linger_time_micros: interval.map_or(0, |i| i.as_micros()),
@@ -106,6 +112,7 @@ impl IggyProducer {
             last_sent_at: Arc::new(AtomicU64::new(0)),
             send_retries_count,
             send_retries_interval,
+            _join_handle: None,
         }
     }
 
diff --git a/core/sdk/src/clients/producer_builder.rs 
b/core/sdk/src/clients/producer_builder.rs
index 76ecfeea..395774b1 100644
--- a/core/sdk/src/clients/producer_builder.rs
+++ b/core/sdk/src/clients/producer_builder.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use super::send_mode::SendMode;
 use super::MAX_BATCH_SIZE;
 use crate::prelude::IggyProducer;
 use iggy_binary_protocol::Client;
@@ -36,6 +37,7 @@ pub struct IggyProducerBuilder {
     encryptor: Option<Arc<EncryptorKind>>,
     partitioner: Option<Arc<dyn Partitioner>>,
     linger_time: Option<IggyDuration>,
+    send_mode: SendMode,
     create_stream_if_not_exists: bool,
     create_topic_if_not_exists: bool,
     topic_partitions_count: u32,
@@ -67,6 +69,7 @@ impl IggyProducerBuilder {
             partitioning: None,
             encryptor,
             partitioner,
+            send_mode: SendMode::default(),
             linger_time: Some(IggyDuration::from(1000)),
             create_stream_if_not_exists: true,
             create_topic_if_not_exists: true,
diff --git a/core/sdk/src/clients/send_mode.rs 
b/core/sdk/src/clients/send_mode.rs
new file mode 100644
index 00000000..98b3abef
--- /dev/null
+++ b/core/sdk/src/clients/send_mode.rs
@@ -0,0 +1,27 @@
+use iggy_common::IggyDuration;
+
+#[derive(Debug, Clone, Default)]
+pub enum SendMode {
+    #[default]
+    Sync,
+    Background(BackgroundConfig),
+}
+
+#[derive(Debug, Clone)]
+/// Determines how the `send_messages` API should behave when problem is 
encountered
+pub enum BackpressureMode {
+    /// Block until the send succeeds
+    Block,
+    /// Block with a timeout, after which the send fails
+    BlockWithTimeout(IggyDuration),
+    /// Fail immediately without retrying
+    FailImmediately,
+}
+
+#[derive(Debug, Clone)]
+pub struct BackgroundConfig {
+    pub max_in_flight: usize,
+    pub in_flight_timeout: Option<IggyDuration>,
+    pub batch_size: Option<usize>,
+    pub failure_mode: BackpressureMode,
+}

Reply via email to