This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch impl_opentel_runtime_sdk in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0e03b65d4caa3424ef019d5ecec0dbe48226f815 Author: numminex <[email protected]> AuthorDate: Sat Sep 27 13:38:29 2025 +0200 feat(io_uring): implement the runtime trait from opentelemetry_sdk --- core/server/Cargo.toml | 2 - core/server/src/log/logger.rs | 5 ++- core/server/src/log/mod.rs | 1 + core/server/src/log/runtime.rs | 83 ++++++++++++++++++++++++++++++------------ 4 files changed, 64 insertions(+), 27 deletions(-) diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index c2c241e8..64f71a31 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -92,10 +92,8 @@ opentelemetry-otlp = { version = "0.30.0", features = [ ] } opentelemetry-semantic-conventions = "0.30.0" opentelemetry_sdk = { version = "0.30.0", features = [ - "rt-tokio", "logs", "trace", - "tokio", "experimental_async_runtime", "experimental_logs_batch_log_processor_with_async_runtime", "experimental_trace_batch_span_processor_with_async_runtime", diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs index c83da4c8..59725685 100644 --- a/core/server/src/log/logger.rs +++ b/core/server/src/log/logger.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::log::runtime::CompioRuntime; use crate::VERSION; use crate::configs::server::{TelemetryConfig, TelemetryTransport}; use crate::configs::system::LoggingConfig; @@ -217,7 +218,7 @@ impl Logging { .with_log_processor( log_processor_with_async_runtime::BatchLogProcessor::builder( log_exporter, - runtime::Tokio, + CompioRuntime, ) .build(), ) @@ -249,7 +250,7 @@ impl Logging { .with_span_processor( span_processor_with_async_runtime::BatchSpanProcessor::builder( trace_exporter, - runtime::Tokio, + CompioRuntime ) .build(), ) diff --git a/core/server/src/log/mod.rs b/core/server/src/log/mod.rs index 2af4bb33..f3acb50b 100644 --- a/core/server/src/log/mod.rs +++ b/core/server/src/log/mod.rs @@ -16,3 +16,4 @@ * under the License. */ pub mod logger; +pub mod runtime; diff --git a/core/server/src/log/runtime.rs b/core/server/src/log/runtime.rs index 36bbdd80..1683b34a 100644 --- a/core/server/src/log/runtime.rs +++ b/core/server/src/log/runtime.rs @@ -1,48 +1,85 @@ -use std::{pin::Pin, time::Duration}; +use std::{pin::Pin, task::Poll, time::Duration}; -use futures::{FutureExt, SinkExt, Stream}; +use futures::{channel::mpsc, future::poll_fn, FutureExt, SinkExt, Stream, StreamExt}; use opentelemetry_sdk::runtime::{Runtime, RuntimeChannel, TrySend}; #[derive(Clone)] -pub struct MonoioRuntime; +pub struct CompioRuntime; -impl Runtime for MonoioRuntime { +impl Runtime for CompioRuntime { fn spawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static, { - // TODO: This wont' work, we init Opentelemetry in the main thread, when there is no instance of monoio runtime - // running yet.... - monoio::spawn(future); + // It's fine to detach this task, the documentation for `spawn` method on `Runtime` trait says: + // + // + // "This is mainly used to run batch span processing in the background. Note, that the function + // does not return a handle. OpenTelemetry will use a different way to wait for the future to + // finish when the caller shuts down."" + compio::runtime::spawn(future).detach(); } fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static { - let sleep = Sleep::new(duration); - sleep + compio::time::sleep(duration) } } +#[derive(Debug)] +pub struct CompioSender<T> { + sender: mpsc::UnboundedSender<T>, +} + +impl<T> CompioSender<T> { + pub fn new(sender: mpsc::UnboundedSender<T>) -> Self { + Self { sender } + } +} -pub struct Sleep { - pub inner: Pin<Box<monoio::time::Sleep>>, +// Safety: Since we use compio runtime which is single-threaded, or rather the Future: !Send + !Sync, +// we can implement those traits, to satisfy the trait bounds from `Runtime` and `RuntimeChannel` traits. +unsafe impl<T> Send for CompioSender<T> {} +unsafe impl<T> Sync for CompioSender<T> {} + +impl<T: std::fmt::Debug + Send> TrySend for CompioSender<T> { + type Message = T; + + fn try_send(&self, item: Self::Message) -> Result<(), opentelemetry_sdk::runtime::TrySendError> { + self.sender.unbounded_send(item).map_err(|_err| { + // Unbounded channels can only fail if disconnected, never full + opentelemetry_sdk::runtime::TrySendError::ChannelClosed + }) + } } -impl Sleep { - pub fn new(duration: Duration) -> Self { - Self { - inner: Box::pin(monoio::time::sleep(duration)), - } +pub struct CompioReceiver<T> { + receiver: mpsc::UnboundedReceiver<T>, +} + +impl<T> CompioReceiver<T> { + pub fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self { + Self { receiver } } } -impl Future for Sleep { - type Output = (); +impl<T: std::fmt::Debug + Send> Stream for CompioReceiver<T> { + type Item = T; - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> { - self.inner.as_mut().poll_unpin(cx) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> { + self.receiver.poll_next_unpin(cx) } } -// Safety: There is no way for `Sleep` future to be flipped like a burger to another thread, -// because we create instance of OpenTelemetry SDK runtime in the main thread, and monoio futures don't require Send & Sync bounds. -unsafe impl Send for Sleep {} \ No newline at end of file +impl RuntimeChannel for CompioRuntime { + type Receiver<T: std::fmt::Debug + Send> = CompioReceiver<T>; + type Sender<T: std::fmt::Debug + Send> = CompioSender<T>; + + fn batch_message_channel<T: std::fmt::Debug + Send>( + &self, + _capacity: usize, + ) -> (Self::Sender<T>, Self::Receiver<T>) { + // Use the unbounded channel, this trait is used for batch processing, which naturally will limit the number of messages. + let (sender, receiver) = mpsc::unbounded(); + (CompioSender::new(sender), CompioReceiver::new(receiver)) + } +}
