This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9baaf38d4efb13a84b1267155dcf5380a8580142 Author: haze518 <[email protected]> AuthorDate: Sat May 17 10:53:21 2025 +0600 del --- core/sdk/src/clients/mod.rs | 1 + core/sdk/src/clients/producer.rs | 7 +++++++ core/sdk/src/clients/producer_builder.rs | 3 +++ core/sdk/src/clients/send_mode.rs | 27 +++++++++++++++++++++++++++ 4 files changed, 38 insertions(+) diff --git a/core/sdk/src/clients/mod.rs b/core/sdk/src/clients/mod.rs index be854f1b..fc872904 100644 --- a/core/sdk/src/clients/mod.rs +++ b/core/sdk/src/clients/mod.rs @@ -32,6 +32,7 @@ pub mod consumer; pub mod consumer_builder; pub mod producer; pub mod producer_builder; +pub mod send_mode; const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst; const MAX_BATCH_SIZE: usize = 1000000; diff --git a/core/sdk/src/clients/producer.rs b/core/sdk/src/clients/producer.rs index f3706e3a..751b624f 100644 --- a/core/sdk/src/clients/producer.rs +++ b/core/sdk/src/clients/producer.rs @@ -1,3 +1,4 @@ +use super::send_mode::SendMode; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,6 +26,7 @@ use iggy_common::{ CompressionAlgorithm, DiagnosticEvent, EncryptorKind, IdKind, Identifier, IggyDuration, IggyError, IggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize, Partitioner, Partitioning, }; +use tokio::task::JoinHandle; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64}; use std::time::Duration; @@ -44,6 +46,7 @@ pub struct IggyProducer { topic_name: String, batch_length: Option<usize>, partitioning: Option<Arc<Partitioning>>, + send_mode: SendMode, encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, linger_time_micros: u64, @@ -58,6 +61,8 @@ pub struct IggyProducer { last_sent_at: Arc<AtomicU64>, send_retries_count: Option<u32>, send_retries_interval: Option<IggyDuration>, + + _join_handle: Option<JoinHandle<()>>, } impl IggyProducer { @@ -92,6 +97,7 @@ impl IggyProducer { topic_name, batch_length, partitioning: partitioning.map(Arc::new), + send_mode: SendMode::default(), encryptor, partitioner, linger_time_micros: interval.map_or(0, |i| i.as_micros()), @@ -106,6 +112,7 @@ impl IggyProducer { last_sent_at: Arc::new(AtomicU64::new(0)), send_retries_count, send_retries_interval, + _join_handle: None, } } diff --git a/core/sdk/src/clients/producer_builder.rs b/core/sdk/src/clients/producer_builder.rs index 76ecfeea..395774b1 100644 --- a/core/sdk/src/clients/producer_builder.rs +++ b/core/sdk/src/clients/producer_builder.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use super::send_mode::SendMode; use super::MAX_BATCH_SIZE; use crate::prelude::IggyProducer; use iggy_binary_protocol::Client; @@ -36,6 +37,7 @@ pub struct IggyProducerBuilder { encryptor: Option<Arc<EncryptorKind>>, partitioner: Option<Arc<dyn Partitioner>>, linger_time: Option<IggyDuration>, + send_mode: SendMode, create_stream_if_not_exists: bool, create_topic_if_not_exists: bool, topic_partitions_count: u32, @@ -67,6 +69,7 @@ impl IggyProducerBuilder { partitioning: None, encryptor, partitioner, + send_mode: SendMode::default(), linger_time: Some(IggyDuration::from(1000)), create_stream_if_not_exists: true, create_topic_if_not_exists: true, diff --git a/core/sdk/src/clients/send_mode.rs b/core/sdk/src/clients/send_mode.rs new file mode 100644 index 00000000..98b3abef --- /dev/null +++ b/core/sdk/src/clients/send_mode.rs @@ -0,0 +1,27 @@ +use iggy_common::IggyDuration; + +#[derive(Debug, Clone, Default)] +pub enum SendMode { + #[default] + Sync, + Background(BackgroundConfig), +} + +#[derive(Debug, Clone)] +/// Determines how the `send_messages` API should behave when problem is encountered +pub enum BackpressureMode { + /// Block until the send succeeds + Block, + /// Block with a timeout, after which the send fails + BlockWithTimeout(IggyDuration), + /// Fail immediately without retrying + FailImmediately, +} + +#[derive(Debug, Clone)] +pub struct BackgroundConfig { + pub max_in_flight: usize, + pub in_flight_timeout: Option<IggyDuration>, + pub batch_size: Option<usize>, + pub failure_mode: BackpressureMode, +}
