This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 5cb4a07a initial setup and migration of shards
5cb4a07a is described below
commit 5cb4a07aad3b7f38d76e559ab4065b850567d9e4
Author: numinex <[email protected]>
AuthorDate: Sun May 11 17:17:51 2025 +0200
initial setup and migration of shards
---
Cargo.lock | 114 +++++++++++++++++++-
core/server/Cargo.toml | 2 +
core/server/src/bootstrap.rs | 0
core/server/src/lib.rs | 2 +
core/server/src/shard/connector.rs | 140 +++++++++++++++++++++++++
core/server/src/shard/frame.rs | 68 ++++++++++++
core/server/src/shard/mod.rs | 59 +++++++++++
core/server/src/{lib.rs => shard/namespace.rs} | 48 +++------
8 files changed, 397 insertions(+), 36 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 96dee5e0..e3a0fa12 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -142,7 +142,7 @@ dependencies = [
"actix-utils",
"futures-core",
"futures-util",
- "mio",
+ "mio 1.0.3",
"socket2",
"tokio",
"tracing",
@@ -615,6 +615,17 @@ dependencies = [
"url",
]
+[[package]]
+name = "auto-const-array"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fd73835ad7deb4bd2b389e6f10333b143f025d607c55ca04c66a0bcc6bb2fc6d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
[[package]]
name = "autocfg"
version = "1.4.0"
@@ -2459,6 +2470,15 @@ dependencies = [
"slab",
]
+[[package]]
+name = "fxhash"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
+dependencies = [
+ "byteorder",
+]
+
[[package]]
name = "generator"
version = "0.8.4"
@@ -3697,6 +3717,16 @@ dependencies = [
"zip",
]
+[[package]]
+name = "io-uring"
+version = "0.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "595a0399f411a508feb2ec1e970a4a30c249351e30208960d58298de8660b0e5"
+dependencies = [
+ "bitflags 1.3.2",
+ "libc",
+]
+
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -4146,6 +4176,15 @@ version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
+[[package]]
+name = "memoffset"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
+dependencies = [
+ "autocfg",
+]
+
[[package]]
name = "mimalloc"
version = "0.1.46"
@@ -4195,6 +4234,18 @@ dependencies = [
"adler2",
]
+[[package]]
+name = "mio"
+version = "0.8.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
+dependencies = [
+ "libc",
+ "log",
+ "wasi 0.11.0+wasi-snapshot-preview1",
+ "windows-sys 0.48.0",
+]
+
[[package]]
name = "mio"
version = "1.0.3"
@@ -4255,6 +4306,37 @@ dependencies = [
"uuid",
]
+[[package]]
+name = "monoio"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a"
+dependencies = [
+ "auto-const-array",
+ "bytes",
+ "fxhash",
+ "io-uring",
+ "libc",
+ "memchr",
+ "mio 0.8.11",
+ "monoio-macros",
+ "nix 0.26.4",
+ "pin-project-lite",
+ "socket2",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
+name = "monoio-macros"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
[[package]]
name = "nanorand"
version = "0.7.0"
@@ -4287,6 +4369,19 @@ version = "6.6.666"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6"
+[[package]]
+name = "nix"
+version = "0.26.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
+dependencies = [
+ "bitflags 1.3.2",
+ "cfg-if",
+ "libc",
+ "memoffset",
+ "pin-utils",
+]
+
[[package]]
name = "nix"
version = "0.30.1"
@@ -4340,7 +4435,7 @@ dependencies = [
"kqueue",
"libc",
"log",
- "mio",
+ "mio 1.0.3",
"notify-types",
"walkdir",
"windows-sys 0.59.0",
@@ -6162,7 +6257,8 @@ dependencies = [
"mimalloc",
"mockall",
"moka",
- "nix",
+ "monoio",
+ "nix 0.30.1",
"once_cell",
"openssl",
"opentelemetry",
@@ -6181,6 +6277,7 @@ dependencies = [
"serde",
"serde_with",
"serial_test",
+ "sharded_queue",
"static-toml",
"strum",
"sysinfo 0.35.0",
@@ -6232,6 +6329,15 @@ dependencies = [
"lazy_static",
]
+[[package]]
+name = "sharded_queue"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a3499be28bd82560e75ad10457698607ba3cc389175ab47ac93279834ce1fab4"
+dependencies = [
+ "crossbeam-utils",
+]
+
[[package]]
name = "shlex"
version = "1.3.0"
@@ -6671,7 +6777,7 @@ dependencies = [
"backtrace",
"bytes",
"libc",
- "mio",
+ "mio 1.0.3",
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index e3ba0235..bbc02478 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -60,6 +60,7 @@ jsonwebtoken = "9.3.1"
lending-iterator = "0.1.7"
mimalloc = { workspace = true, optional = true }
moka = { version = "0.12.10", features = ["future"] }
+monoio = "0.2.4"
nix = { version = "0.30", features = ["fs"] }
once_cell = "1.21.3"
openssl = { workspace = true }
@@ -95,6 +96,7 @@ rustls-pemfile = "2.2.0"
serde = { workspace = true }
serde_with = { workspace = true }
static-toml = "1.3.0"
+sharded_queue = "2.0.1"
strum = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
new file mode 100644
index 00000000..e69de29b
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 91d17586..572301e0 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -26,12 +26,14 @@ static GLOBAL: MiMalloc = MiMalloc;
#[cfg(windows)]
compile_error!("iggy-server doesn't support windows.");
+mod bootstrap;
pub mod archiver;
pub mod args;
pub mod binary;
pub mod channels;
pub(crate) mod compat;
pub mod configs;
+pub mod shard;
pub mod http;
pub mod log;
pub mod quic;
diff --git a/core/server/src/shard/connector.rs
b/core/server/src/shard/connector.rs
new file mode 100644
index 00000000..349218ba
--- /dev/null
+++ b/core/server/src/shard/connector.rs
@@ -0,0 +1,140 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use futures::{task::AtomicWaker, Stream};
+use sharded_queue::ShardedQueue;
+use std::{
+ sync::{atomic::AtomicUsize, Arc},
+ task::Poll,
+};
+
+pub type StopSender = flume::Sender<()>;
+pub type StopReceiver = flume::Receiver<()>;
+
+#[derive(Clone)]
+pub struct ShardConnector<T> {
+ pub id: u16,
+ pub sender: Sender<T>,
+ pub receiver: Receiver<T>,
+ pub stop_receiver: StopReceiver,
+ pub stop_sender: StopSender,
+}
+
+// TODO(numinex) - replace flume with async_channel
+impl<T: Clone> ShardConnector<T> {
+ pub fn new(id: u16, max_concurrent_thread_count: usize) -> Self {
+ let channel =
Arc::new(ShardedChannel::new(max_concurrent_thread_count));
+ let (sender, receiver) = channel.unbounded();
+ let (stop_sender, stop_receiver) = flume::bounded(1);
+ Self {
+ id,
+ receiver,
+ sender,
+ stop_receiver,
+ stop_sender,
+ }
+ }
+
+ pub fn send(&self, data: T) {
+ self.sender.send(data);
+ }
+}
+
+#[derive(Clone)]
+pub struct Receiver<T> {
+ channel: Arc<ShardedChannel<T>>,
+}
+
+#[derive(Clone)]
+pub struct Sender<T> {
+ channel: Arc<ShardedChannel<T>>,
+}
+
+impl<T> Sender<T> {
+ pub fn send(&self, data: T) {
+ self.channel
+ .task_queue
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ self.channel.queue.push_back(data);
+ self.channel.waker.wake();
+ }
+}
+
+pub struct ShardedChannel<T> {
+ queue: ShardedQueue<T>,
+ task_queue: AtomicUsize,
+ waker: AtomicWaker,
+}
+
+impl<T> ShardedChannel<T> {
+ pub fn new(max_concurrent_thread_count: usize) -> Self {
+ let waker = AtomicWaker::new();
+
+ Self {
+ queue: ShardedQueue::new(max_concurrent_thread_count),
+ task_queue: AtomicUsize::new(0),
+ waker,
+ }
+ }
+}
+
+pub trait ShardedChannelsSplit<T: Clone> {
+ fn unbounded(&self) -> (Sender<T>, Receiver<T>);
+
+ fn sender(&self) -> Sender<T>;
+}
+
+impl<T: Clone> ShardedChannelsSplit<T> for Arc<ShardedChannel<T>> {
+ fn unbounded(&self) -> (Sender<T>, Receiver<T>) {
+ let tx = self.sender();
+ let rx = Receiver {
+ channel: Arc::clone(self),
+ };
+
+ (tx, rx)
+ }
+
+ fn sender(&self) -> Sender<T> {
+ Sender {
+ channel: Arc::clone(self),
+ }
+ }
+}
+
+impl<T> Stream for Receiver<T> {
+ type Item = T;
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ let old = self
+ .channel
+ .task_queue
+ .load(std::sync::atomic::Ordering::Relaxed);
+ if old == 0 {
+ self.channel.waker.register(cx.waker());
+ return Poll::Pending;
+ }
+
+ assert!(old > 0);
+ self.channel
+ .task_queue
+ .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
+ let item = self.channel.queue.pop_front_or_spin_wait_item();
+ Poll::Ready(Some(item))
+ }
+}
diff --git a/core/server/src/shard/frame.rs b/core/server/src/shard/frame.rs
new file mode 100644
index 00000000..b1524ad6
--- /dev/null
+++ b/core/server/src/shard/frame.rs
@@ -0,0 +1,68 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use bytes::Bytes;
+use flume::Sender;
+use iggy_common::IggyError;
+
+#[derive(Debug, Clone)]
+pub enum ShardMessage {
+ //TODO: Fixme
+ //Command(ServerCommand),
+ Event(ShardEvent),
+}
+
+#[derive(Debug, Clone)]
+pub enum ShardEvent {
+}
+
+#[derive(Debug)]
+pub enum ShardResponse {
+ BinaryResponse(Bytes),
+ ErrorResponse(IggyError),
+}
+
+#[derive(Debug, Clone)]
+pub struct ShardFrame {
+ pub client_id: u32,
+ pub message: ShardMessage,
+ pub response_sender: Option<Sender<ShardResponse>>,
+}
+
+impl ShardFrame {
+ pub fn new(
+ client_id: u32,
+ message: ShardMessage,
+ response_sender: Option<Sender<ShardResponse>>,
+ ) -> Self {
+ Self {
+ client_id,
+ message,
+ response_sender,
+ }
+ }
+}
+
+#[macro_export]
+macro_rules! handle_response {
+ ($sender:expr, $response:expr) => {
+ match $response {
+ ShardResponse::BinaryResponse(payload) =>
$sender.send_ok_response(&payload).await?,
+ ShardResponse::ErrorResponse(err) =>
$sender.send_error_response(err).await?,
+ }
+ };
+}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
new file mode 100644
index 00000000..6ccbcab3
--- /dev/null
+++ b/core/server/src/shard/mod.rs
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+mod connector;
+mod namespace;
+mod frame;
+
+
+
+use std::cell::RefCell;
+use ahash::HashMap;
+use connector::ShardConnector;
+use frame::ShardFrame;
+use namespace::IggyNamespace;
+struct Shard {
+ id: u16,
+ connection: ShardConnector<ShardFrame>,
+}
+
+struct ShardInfo {
+ id: u16,
+}
+
+pub struct IggyShard {
+ pub id: u16,
+ shards: Vec<Shard>,
+ shards_table: RefCell<HashMap<IggyNamespace, ShardInfo>>,
+
+ pub(crate) permissioner: RefCell<Permissioner>,
+ pub(crate) storage: Rc<SystemStorage>,
+ pub(crate) streams: RwLock<HashMap<u32, Stream>>,
+ pub(crate) streams_ids: RefCell<HashMap<String, u32>>,
+ pub(crate) users: RefCell<HashMap<UserId, User>>,
+
+ // TODO - get rid of this dynamic dispatch.
+ pub(crate) state: Rc<FileState>,
+ pub(crate) encryptor: Option<Rc<dyn Encryptor>>,
+ pub(crate) config: ServerConfig,
+ pub(crate) client_manager: RefCell<ClientManager>,
+ pub(crate) active_sessions: RefCell<Vec<Session>>,
+ pub(crate) metrics: Metrics,
+ pub message_receiver: Cell<Option<Receiver<ShardFrame>>>,
+ stop_receiver: StopReceiver,
+ stop_sender: StopSender,
+}
\ No newline at end of file
diff --git a/core/server/src/lib.rs b/core/server/src/shard/namespace.rs
similarity index 51%
copy from core/server/src/lib.rs
copy to core/server/src/shard/namespace.rs
index 91d17586..4b972125 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/shard/namespace.rs
@@ -15,39 +15,23 @@
* specific language governing permissions and limitations
* under the License.
*/
+#[derive(Debug, Clone, Eq, PartialEq, Hash)]
+pub struct IggyNamespace {
+ pub(crate) stream_id: u32,
+ pub(crate) topic_id: u32,
+ pub(crate) partition_id: u32,
+}
-#[cfg(not(feature = "disable-mimalloc"))]
-use mimalloc::MiMalloc;
-
-#[cfg(not(feature = "disable-mimalloc"))]
-#[global_allocator]
-static GLOBAL: MiMalloc = MiMalloc;
-
-#[cfg(windows)]
-compile_error!("iggy-server doesn't support windows.");
-
-pub mod archiver;
-pub mod args;
-pub mod binary;
-pub mod channels;
-pub(crate) mod compat;
-pub mod configs;
-pub mod http;
-pub mod log;
-pub mod quic;
-pub mod server_error;
-pub mod state;
-pub mod streaming;
-pub mod tcp;
-pub mod versioning;
-
-const VERSION: &str = env!("CARGO_PKG_VERSION");
-const IGGY_ROOT_USERNAME_ENV: &str = "IGGY_ROOT_USERNAME";
-const IGGY_ROOT_PASSWORD_ENV: &str = "IGGY_ROOT_PASSWORD";
+impl IggyNamespace {
+ pub fn new(stream_id: u32, topic_id: u32, partition_id: u32) -> Self {
+ Self {
+ stream_id,
+ topic_id,
+ partition_id,
+ }
+ }
-pub(crate) fn map_toggle_str<'a>(enabled: bool) -> &'a str {
- match enabled {
- true => "enabled",
- false => "disabled",
+ pub fn generate_hash(&self) -> u32 {
+ todo!();
}
}