This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new d5da51de feat(io_uring): replace tokio s3 crate (#2020)
d5da51de is described below

commit d5da51de4596663adc0966bce96ab05009a7d101
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jul 18 16:15:37 2025 +0200

    feat(io_uring): replace tokio s3 crate (#2020)
---
 Cargo.lock                                         | 523 +++++++++++++--------
 Cargo.toml                                         |   1 +
 core/configs/server.toml                           |   8 +-
 core/integration/tests/archiver/disk.rs            |  33 +-
 core/integration/tests/archiver/mod.rs             |   3 +-
 core/integration/tests/archiver/s3.rs              |  38 --
 core/integration/tests/state/mod.rs                |   2 +-
 .../integration/tests/streaming/consumer_offset.rs |   2 +-
 core/integration/tests/streaming/partition.rs      |   2 +-
 core/integration/tests/streaming/segment.rs        |   2 +-
 core/integration/tests/streaming/stream.rs         |   2 +-
 core/integration/tests/streaming/topic.rs          |   2 +-
 core/server/Cargo.toml                             |   5 +-
 core/server/src/archiver/disk.rs                   |  16 +-
 core/server/src/archiver/mod.rs                    |   5 +
 core/server/src/archiver/s3.rs                     | 207 ++++++--
 core/server/src/channels/commands/mod.rs           |   1 -
 core/server/src/channels/handler.rs                |  24 +-
 core/server/src/channels/server_command.rs         |  15 +-
 core/server/src/main.rs                            |  27 ++
 core/server/src/server_error.rs                    |   8 +
 core/server/src/shard/builder.rs                   |   9 +
 core/server/src/shard/mod.rs                       |  16 +-
 .../tasks/auxilary}/maintain_messages.rs           | 342 +++++---------
 core/server/src/shard/tasks/auxilary/mod.rs        |   1 +
 core/server/src/shard/tasks/mod.rs                 |   1 +
 .../server/src/streaming/partitions/persistence.rs |   2 +-
 .../src/streaming/segments/writing_messages.rs     |   4 +-
 core/server/src/streaming/utils/file.rs            |   4 -
 29 files changed, 735 insertions(+), 570 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index fce0c066..2a32b031 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -307,6 +307,15 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "aligned-array"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e05c92d086290f52938013f6242ac62bf7d401fab8ad36798a609faa65c3fd2c"
+dependencies = [
+ "generic-array",
+]
+
 [[package]]
 name = "alloc-no-stdlib"
 version = "2.0.4"
@@ -643,44 +652,12 @@ version = "1.1.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
 
-[[package]]
-name = "attohttpc"
-version = "0.28.5"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "07a9b245ba0739fc90935094c29adbaee3f977218b5fb95e822e261cda7f56a3"
-dependencies = [
- "http 1.3.1",
- "log",
- "rustls",
- "serde",
- "serde_json",
- "url",
- "webpki-roots 0.26.11",
-]
-
 [[package]]
 name = "autocfg"
 version = "1.5.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
 
-[[package]]
-name = "aws-creds"
-version = "0.38.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ba912106484991c456adb3364338a2534d0818bd9374b324b608074e3b55f581"
-dependencies = [
- "attohttpc",
- "home",
- "log",
- "quick-xml 0.32.0",
- "rust-ini",
- "serde",
- "thiserror 1.0.69",
- "time",
- "url",
-]
-
 [[package]]
 name = "aws-lc-rs"
 version = "1.13.1"
@@ -704,15 +681,6 @@ dependencies = [
  "fs_extra",
 ]
 
-[[package]]
-name = "aws-region"
-version = "0.26.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "73ae4ae7c45238b60af0a3b27ef2fcc7bd5b8fdcd8a6d679919558b40d3eff7a"
-dependencies = [
- "thiserror 1.0.69",
-]
-
 [[package]]
 name = "axum"
 version = "0.7.9"
@@ -1317,15 +1285,6 @@ dependencies = [
  "thiserror 2.0.12",
 ]
 
-[[package]]
-name = "castaway"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0abae9be0aaf9ea96a3b1b8b1b55c602ca751eba1b1500220cea4ecbafe7c0d5"
-dependencies = [
- "rustversion",
-]
-
 [[package]]
 name = "cc"
 version = "1.2.27"
@@ -1533,16 +1492,23 @@ dependencies = [
 ]
 
 [[package]]
-name = "compact_str"
-version = "0.7.1"
+name = "compio"
+version = "0.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f"
+checksum = "713c6293af093c202ad318e8f7bdc1de1a36d7a793bb77f7fc6bd6f1788659a9"
 dependencies = [
- "castaway",
- "cfg-if",
- "itoa",
- "ryu",
- "static_assertions",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-dispatcher",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-fs 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-net 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-process",
+ "compio-quic",
+ "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-signal",
+ "compio-tls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
 [[package]]
@@ -1550,15 +1516,26 @@ name = "compio"
 version = "0.15.0"
 source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be";
 dependencies = [
- "compio-buf",
- "compio-driver",
- "compio-fs",
- "compio-io",
- "compio-log",
+ "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-driver 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-fs 0.8.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-io 0.7.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-log 0.1.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "compio-macros",
- "compio-net",
- "compio-runtime",
- "compio-tls",
+ "compio-net 0.8.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-runtime 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-tls 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+]
+
+[[package]]
+name = "compio-buf"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3ce94a45a47ef8c0e3f44084fe67c8effc25e7ac1de6de2ee1a29a59e6c6ba8e"
+dependencies = [
+ "arrayvec",
+ "bytes",
+ "libc",
 ]
 
 [[package]]
@@ -1571,6 +1548,43 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "compio-dispatcher"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0cdf8c613be826be410d8744ab30acc49cc5134a78e2aa25efae9efa44bed6a7"
+dependencies = [
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
+ "flume",
+ "futures-channel",
+]
+
+[[package]]
+name = "compio-driver"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "737212fe00b4af769f7e8f156c25ffafd5888d4d21834e100ea068dea1086ef8"
+dependencies = [
+ "aligned-array",
+ "cfg-if",
+ "cfg_aliases",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel",
+ "crossbeam-queue",
+ "futures-util",
+ "io-uring",
+ "io_uring_buf_ring",
+ "libc",
+ "once_cell",
+ "paste",
+ "polling",
+ "slab",
+ "socket2 0.5.10",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "compio-driver"
 version = "0.8.1"
@@ -1578,8 +1592,8 @@ source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
 dependencies = [
  "cfg-if",
  "cfg_aliases",
- "compio-buf",
- "compio-log",
+ "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-log 0.1.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "crossbeam-channel",
  "crossbeam-queue",
  "futures-util",
@@ -1594,6 +1608,24 @@ dependencies = [
  "windows-sys 0.60.2",
 ]
 
+[[package]]
+name = "compio-fs"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9bcf65e631d521c666bca25595f8e5c78173e96f0b3b61f0a7d93f31d9661d32"
+dependencies = [
+ "cfg-if",
+ "cfg_aliases",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
+ "libc",
+ "os_pipe",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "compio-fs"
 version = "0.8.0"
@@ -1601,27 +1633,48 @@ source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
 dependencies = [
  "cfg-if",
  "cfg_aliases",
- "compio-buf",
- "compio-driver",
- "compio-io",
- "compio-runtime",
+ "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-driver 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-io 0.7.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-runtime 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "libc",
  "os_pipe",
  "widestring",
  "windows-sys 0.60.2",
 ]
 
+[[package]]
+name = "compio-io"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c2b05cc4142659f2c90b6e44c68568ff71c83c6fb9285aca686952250b914932"
+dependencies = [
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util",
+ "paste",
+ "pin-project-lite",
+]
+
 [[package]]
 name = "compio-io"
 version = "0.7.0"
 source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be";
 dependencies = [
- "compio-buf",
+ "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "futures-util",
  "paste",
  "pin-project-lite",
 ]
 
+[[package]]
+name = "compio-log"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fc4e560213c1996b618da369b7c9109564b41af9033802ae534465c4ee4e132f"
+dependencies = [
+ "tracing",
+]
+
 [[package]]
 name = "compio-log"
 version = "0.1.0"
@@ -1641,16 +1694,35 @@ dependencies = [
  "syn 2.0.104",
 ]
 
+[[package]]
+name = "compio-net"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0c1fabe3393bc0c3a0dca8e99a35bf97e42caa12bb3cc6bba83df04e28c9c142"
+dependencies = [
+ "cfg-if",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
+ "either",
+ "libc",
+ "once_cell",
+ "socket2 0.5.10",
+ "widestring",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "compio-net"
 version = "0.8.0"
 source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be";
 dependencies = [
  "cfg-if",
- "compio-buf",
- "compio-driver",
- "compio-io",
- "compio-runtime",
+ "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-driver 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-io 0.7.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-runtime 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "either",
  "libc",
  "once_cell",
@@ -1659,6 +1731,64 @@ dependencies = [
  "windows-sys 0.60.2",
 ]
 
+[[package]]
+name = "compio-process"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3867cfe7b23eaae89ff815aba4fdde61cb6fd55f81fd368128300c6b7e645016"
+dependencies = [
+ "cfg-if",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-quic"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6f107e044329f1e171930801b09bfc6e764c5e171e45c7a3e382f98561da619a"
+dependencies = [
+ "cfg_aliases",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-net 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
+ "flume",
+ "futures-util",
+ "libc",
+ "quinn-proto",
+ "rustc-hash 2.1.1",
+ "rustls",
+ "thiserror 2.0.12",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-runtime"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b7df559e87b7ab05ba61c32619f6076dd5cc2daf5a8cb30cb9931fb355d20aff"
+dependencies = [
+ "async-task",
+ "cfg-if",
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-log 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-queue",
+ "futures-util",
+ "libc",
+ "once_cell",
+ "scoped-tls",
+ "slab",
+ "socket2 0.5.10",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "compio-runtime"
 version = "0.8.1"
@@ -1666,9 +1796,9 @@ source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd
 dependencies = [
  "async-task",
  "cfg-if",
- "compio-buf",
- "compio-driver",
- "compio-log",
+ "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-driver 0.8.1 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-log 0.1.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "core_affinity",
  "crossbeam-queue",
  "futures-util",
@@ -1680,13 +1810,40 @@ dependencies = [
  "windows-sys 0.60.2",
 ]
 
+[[package]]
+name = "compio-signal"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "03d2931880b03b33d4df7d2b8a008e93731366d185358c7442fc8d24d5f9c1bd"
+dependencies = [
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-driver 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-runtime 0.8.1 
(registry+https://github.com/rust-lang/crates.io-index)",
+ "libc",
+ "once_cell",
+ "os_pipe",
+ "slab",
+ "windows-sys 0.52.0",
+]
+
+[[package]]
+name = "compio-tls"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "542bb0e0f6f65cb84bc09b7e052fa54f006d1ba228a8dfad6d7b9676defe7232"
+dependencies = [
+ "compio-buf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "compio-io 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rustls",
+]
+
 [[package]]
 name = "compio-tls"
 version = "0.6.0"
 source = 
"git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be#fe4243f0b6811ebc325afd081c9b087b4d9817be";
 dependencies = [
- "compio-buf",
- "compio-io",
+ "compio-buf 0.6.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
+ "compio-io 0.7.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "rustls",
 ]
 
@@ -2102,6 +2259,46 @@ dependencies = [
  "regex-syntax 0.7.5",
 ]
 
+[[package]]
+name = "cyper"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "89b65af5073b4f53c9697b611b414042e71c6a14e11088438a67b1ef36f51ca2"
+dependencies = [
+ "async-stream",
+ "base64 0.22.1",
+ "compio 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "cyper-core",
+ "encoding_rs",
+ "futures-util",
+ "http 1.3.1",
+ "http-body-util",
+ "hyper",
+ "hyper-util",
+ "mime",
+ "send_wrapper",
+ "serde",
+ "serde_urlencoded",
+ "thiserror 2.0.12",
+ "url",
+]
+
+[[package]]
+name = "cyper-core"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6343deaa569c748860d9afefab636648e7e6f9abdfc26b3b9dde327170ae6b2b"
+dependencies = [
+ "cfg-if",
+ "compio 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util",
+ "hyper",
+ "hyper-util",
+ "rustls-platform-verifier 0.6.0",
+ "send_wrapper",
+ "tower-service",
+]
+
 [[package]]
 name = "darling"
 version = "0.20.11"
@@ -4394,7 +4591,7 @@ dependencies = [
  "async-trait",
  "bytes",
  "chrono",
- "compio",
+ "compio 0.15.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "ctor",
  "derive_more 2.0.1",
  "env_logger",
@@ -4986,17 +5183,6 @@ version = "0.8.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
 
-[[package]]
-name = "maybe-async"
-version = "0.2.10"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn 2.0.104",
-]
-
 [[package]]
 name = "md-5"
 version = "0.10.6"
@@ -5007,12 +5193,6 @@ dependencies = [
  "digest",
 ]
 
-[[package]]
-name = "md5"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
-
 [[package]]
 name = "memchr"
 version = "2.7.5"
@@ -5066,15 +5246,6 @@ dependencies = [
  "unicase",
 ]
 
-[[package]]
-name = "minidom"
-version = "0.16.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e394a0e3c7ccc2daea3dffabe82f09857b6b510cb25af87d54bf3e910ac1642d"
-dependencies = [
- "rxml",
-]
-
 [[package]]
 name = "minimal-lexical"
 version = "0.2.1"
@@ -6292,19 +6463,9 @@ dependencies = [
 
 [[package]]
 name = "quick-xml"
-version = "0.32.0"
+version = "0.37.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2"
-dependencies = [
- "memchr",
- "serde",
-]
-
-[[package]]
-name = "quick-xml"
-version = "0.36.2"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe"
+checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
 dependencies = [
  "memchr",
  "serde",
@@ -6345,7 +6506,7 @@ dependencies = [
  "rustc-hash 2.1.1",
  "rustls",
  "rustls-pki-types",
- "rustls-platform-verifier",
+ "rustls-platform-verifier 0.5.3",
  "slab",
  "thiserror 2.0.12",
  "tinyvec",
@@ -6633,14 +6794,12 @@ dependencies = [
  "sync_wrapper",
  "tokio",
  "tokio-rustls",
- "tokio-util",
  "tower 0.5.2",
  "tower-http",
  "tower-service",
  "url",
  "wasm-bindgen",
  "wasm-bindgen-futures",
- "wasm-streams",
  "web-sys",
  "webpki-roots 1.0.1",
 ]
@@ -6783,40 +6942,6 @@ dependencies = [
  "trim-in-place",
 ]
 
-[[package]]
-name = "rust-s3"
-version = "0.36.0-beta.2"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "cc7d6f3a3dd397743e8f344ffc80ea7137aee423983ae25b512e5332ad11362f"
-dependencies = [
- "async-trait",
- "aws-creds",
- "aws-region",
- "base64 0.22.1",
- "bytes",
- "cfg-if",
- "futures",
- "hex",
- "hmac",
- "http 1.3.1",
- "log",
- "maybe-async",
- "md5",
- "minidom",
- "percent-encoding",
- "quick-xml 0.36.2",
- "reqwest",
- "serde",
- "serde_derive",
- "serde_json",
- "sha2",
- "thiserror 1.0.69",
- "time",
- "tokio",
- "tokio-stream",
- "url",
-]
-
 [[package]]
 name = "rust_decimal"
 version = "1.37.2"
@@ -6954,6 +7079,27 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "rustls-platform-verifier"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "eda84358ed17f1f354cf4b1909ad346e6c7bc2513e8c40eb08e0157aa13a9070"
+dependencies = [
+ "core-foundation",
+ "core-foundation-sys",
+ "jni",
+ "log",
+ "once_cell",
+ "rustls",
+ "rustls-native-certs",
+ "rustls-platform-verifier-android",
+ "rustls-webpki",
+ "security-framework",
+ "security-framework-sys",
+ "webpki-root-certs 1.0.1",
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "rustls-platform-verifier-android"
 version = "0.1.1"
@@ -6979,22 +7125,22 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
 
 [[package]]
-name = "rxml"
-version = "0.11.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "65bc94b580d0f5a6b7a2d604e597513d3c673154b52ddeccd1d5c32360d945ee"
-dependencies = [
- "bytes",
- "rxml_validation",
-]
-
-[[package]]
-name = "rxml_validation"
-version = "0.11.0"
+name = "rusty-s3"
+version = "0.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "826e80413b9a35e9d33217b3dcac04cf95f6559d15944b93887a08be5496c4a4"
+checksum = "8f51a5a6b15f25d3e10c068039ee13befb6110fcb36c2b26317bcbdc23484d96"
 dependencies = [
- "compact_str",
+ "base64 0.22.1",
+ "hmac",
+ "md-5",
+ "percent-encoding",
+ "quick-xml",
+ "serde",
+ "serde_json",
+ "sha2",
+ "time",
+ "url",
+ "zeroize",
 ]
 
 [[package]]
@@ -7128,6 +7274,15 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "send_wrapper"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
+dependencies = [
+ "futures-core",
+]
+
 [[package]]
 name = "serde"
 version = "1.0.219"
@@ -7300,10 +7455,11 @@ dependencies = [
  "bytes",
  "chrono",
  "clap",
- "compio",
+ "compio 0.15.0 
(git+https://github.com/compio-rs/compio.git?rev=fe4243f0b6811ebc325afd081c9b087b4d9817be)",
  "console-subscriber",
  "crossbeam",
  "ctrlc",
+ "cyper",
  "dashmap",
  "derive_more 2.0.1",
  "dotenvy",
@@ -7332,9 +7488,9 @@ dependencies = [
  "quinn",
  "reqwest",
  "ring",
- "rust-s3",
  "rustls",
  "rustls-pemfile",
+ "rusty-s3",
  "serde",
  "serde_with",
  "serial_test",
@@ -7789,12 +7945,6 @@ dependencies = [
  "toml",
 ]
 
-[[package]]
-name = "static_assertions"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
-
 [[package]]
 name = "stringprep"
 version = "0.1.5"
@@ -8914,19 +9064,6 @@ dependencies = [
  "unicode-ident",
 ]
 
-[[package]]
-name = "wasm-streams"
-version = "0.4.2"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
-dependencies = [
- "futures-util",
- "js-sys",
- "wasm-bindgen",
- "wasm-bindgen-futures",
- "web-sys",
-]
-
 [[package]]
 name = "wasm-timer"
 version = "0.2.5"
diff --git a/Cargo.toml b/Cargo.toml
index 411d75b9..1faea63f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -120,6 +120,7 @@ tempfile = "3.20.0"
 thiserror = "2.0.12"
 tokio = { version = "1.45.1", features = ["full"] }
 compio =  { git = "https://github.com/compio-rs/compio.git";, rev = 
"fe4243f0b6811ebc325afd081c9b087b4d9817be", features = ["runtime", "macros", 
"io-uring", "time", "rustls"] }
+cyper = { version = "0.4.0", features = ["rustls"], default-features = false}
 tokio-rustls = "0.26.2"
 toml = "0.8.23"
 tracing = "0.1.41"
diff --git a/core/configs/server.toml b/core/configs/server.toml
index e235baa8..7aba9ec1 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -17,7 +17,7 @@
 
 [data_maintenance.archiver]
 # Enables or disables the archiver process.
-enabled = false
+enabled = true
 
 # Kind of archiver to use. Available options: "disk".
 kind = "disk"
@@ -47,13 +47,13 @@ tmp_upload_dir = "local_data/s3_tmp"
 
 [data_maintenance.messages]
 # Enables or disables the archiver process for closed segments containing 
messages.
-archiver_enabled = false
+archiver_enabled = true
 
 # Enables or disables the expired message cleaner process.
 cleaner_enabled = false
 
 # Interval for running the message archiver and cleaner.
-interval = "1 m"
+interval = "35s"
 
 [data_maintenance.state]
 # Enables or disables the archiver process for state log.
@@ -549,4 +549,4 @@ bucket_capacity = 8192
 # - "all": Use all available CPU cores (default)
 # - numeric value (e.g. 4): Use 4 shards (4 threads pinned to cores 0, 1, 2, 3)
 # - range (e.g. "5..8"): Use 3 shards with affinity to cores 5, 6, 7
-cpu_allocation = "all"
+cpu_allocation = "4"
diff --git a/core/integration/tests/archiver/disk.rs 
b/core/integration/tests/archiver/disk.rs
index 00901190..d99ce8ef 100644
--- a/core/integration/tests/archiver/disk.rs
+++ b/core/integration/tests/archiver/disk.rs
@@ -17,12 +17,12 @@
  */
 
 use crate::archiver::DiskArchiverSetup;
+use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
 use server::streaming::utils::file;
 use server::{archiver::Archiver, server_error::ArchiverError};
 use std::path::Path;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
-#[tokio::test]
+#[compio::test]
 async fn should_init_base_archiver_directory() {
     let setup = DiskArchiverSetup::init().await;
     let archiver = setup.archiver();
@@ -32,7 +32,7 @@ async fn should_init_base_archiver_directory() {
     assert!(path.exists());
 }
 
-#[tokio::test]
+#[compio::test]
 async fn should_archive_file_on_disk_by_making_a_copy_of_original_file() {
     let setup = DiskArchiverSetup::init().await;
     let archiver = setup.archiver();
@@ -47,7 +47,7 @@ async fn 
should_archive_file_on_disk_by_making_a_copy_of_original_file() {
     assert_archived_file(&file_to_archive_path, &archived_file_path, 
content).await;
 }
 
-#[tokio::test]
+#[compio::test]
 async fn should_archive_file_on_disk_within_additional_base_directory() {
     let setup = DiskArchiverSetup::init().await;
     let archiver = setup.archiver();
@@ -68,7 +68,7 @@ async fn 
should_archive_file_on_disk_within_additional_base_directory() {
     assert_archived_file(&file_to_archive_path, &archived_file_path, 
content).await;
 }
 
-#[tokio::test]
+#[compio::test]
 async fn should_return_true_when_file_is_archived() {
     let setup = DiskArchiverSetup::init().await;
     let archiver = setup.archiver();
@@ -83,7 +83,7 @@ async fn should_return_true_when_file_is_archived() {
     assert!(is_archived.unwrap());
 }
 
-#[tokio::test]
+#[compio::test]
 async fn should_return_false_when_file_is_not_archived() {
     let setup = DiskArchiverSetup::init().await;
     let archiver = setup.archiver();
@@ -96,7 +96,7 @@ async fn should_return_false_when_file_is_not_archived() {
     assert!(!is_archived.unwrap());
 }
 
-#[tokio::test]
+#[compio::test]
 async fn should_fail_when_file_to_archive_does_not_exist() {
     let setup = DiskArchiverSetup::init().await;
     let archiver = setup.archiver();
@@ -110,26 +110,23 @@ async fn 
should_fail_when_file_to_archive_does_not_exist() {
 }
 
 async fn create_file(path: &str, content: &str) {
-    // TODO: Fixme
-    /*
     let mut file = file::overwrite(path).await.unwrap();
-    file.write_all(content.as_bytes()).await.unwrap();
-    */
+    let content = content.as_bytes().to_vec();
+    file.write_all_at(content, 0).await.unwrap();
 }
 
 async fn assert_archived_file(file_to_archive_path: &str, archived_file_path: 
&str, content: &str) {
-    // TODO: Fixme
-    /*
     assert!(Path::new(&file_to_archive_path).exists());
     assert!(Path::new(&archived_file_path).exists());
     let archived_file = file::open(archived_file_path).await;
     assert!(archived_file.is_ok());
-    let mut archived_file = archived_file.unwrap();
-    let mut archived_file_content = String::new();
-    archived_file
-        .read_to_string(&mut archived_file_content)
+    let archived_file = archived_file.unwrap();
+    let len = archived_file.metadata().await.unwrap().len();
+    let archived_file_content = Vec::with_capacity(len as usize);
+    let (_, archived_file_content) = archived_file
+        .read_exact_at(archived_file_content, 0)
         .await
         .unwrap();
+    let archived_file_content = 
String::from_utf8(archived_file_content).unwrap();
     assert_eq!(content, archived_file_content);
-    */
 }
diff --git a/core/integration/tests/archiver/mod.rs 
b/core/integration/tests/archiver/mod.rs
index 2e3ed94c..704936e9 100644
--- a/core/integration/tests/archiver/mod.rs
+++ b/core/integration/tests/archiver/mod.rs
@@ -16,13 +16,12 @@
  * under the License.
  */
 
+use compio::fs::create_dir;
 use server::archiver::disk::DiskArchiver;
 use server::configs::server::DiskArchiverConfig;
-use tokio::fs::create_dir;
 use uuid::Uuid;
 
 mod disk;
-mod s3;
 
 pub struct DiskArchiverSetup {
     base_path: String,
diff --git a/core/integration/tests/archiver/s3.rs 
b/core/integration/tests/archiver/s3.rs
deleted file mode 100644
index df2970f1..00000000
--- a/core/integration/tests/archiver/s3.rs
+++ /dev/null
@@ -1,38 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use server::archiver::Archiver;
-use server::archiver::s3::S3Archiver;
-use server::configs::server::S3ArchiverConfig;
-
-#[tokio::test]
-async fn should_not_be_initialized_given_invalid_configuration() {
-    let config = S3ArchiverConfig {
-        key_id: "test".to_owned(),
-        key_secret: "secret".to_owned(),
-        bucket: "iggy".to_owned(),
-        endpoint: Some("https://iggy.s3.com".to_owned()),
-        region: None,
-        tmp_upload_dir: "tmp".to_owned(),
-    };
-    let archiver = S3Archiver::new(config);
-    assert!(archiver.is_ok());
-    let archiver = archiver.unwrap();
-    let init = archiver.init().await;
-    assert!(init.is_err());
-}
diff --git a/core/integration/tests/state/mod.rs 
b/core/integration/tests/state/mod.rs
index 2071587f..fe496abd 100644
--- a/core/integration/tests/state/mod.rs
+++ b/core/integration/tests/state/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use compio::fs::create_dir;
 use iggy::prelude::{Aes256GcmEncryptor, EncryptorKind};
 use server::bootstrap::create_directories;
 use server::state::file::FileState;
@@ -24,7 +25,6 @@ use server::streaming::utils::file::overwrite;
 use server::versioning::SemanticVersion;
 use std::str::FromStr;
 use std::sync::Arc;
-use compio::fs::create_dir;
 use uuid::Uuid;
 
 mod file;
diff --git a/core/integration/tests/streaming/consumer_offset.rs 
b/core/integration/tests/streaming/consumer_offset.rs
index 84fc9749..db89169b 100644
--- a/core/integration/tests/streaming/consumer_offset.rs
+++ b/core/integration/tests/streaming/consumer_offset.rs
@@ -17,12 +17,12 @@
  */
 
 use crate::streaming::common::test_setup::TestSetup;
+use compio::fs;
 use iggy::prelude::ConsumerKind;
 use server::configs::system::SystemConfig;
 use server::streaming::partitions::partition::ConsumerOffset;
 use server::streaming::storage::PartitionStorageKind;
 use std::sync::Arc;
-use compio::fs;
 
 #[compio::test]
 async fn should_persist_consumer_offsets_and_then_load_them_from_disk() {
diff --git a/core/integration/tests/streaming/partition.rs 
b/core/integration/tests/streaming/partition.rs
index 828532f9..ac657e3b 100644
--- a/core/integration/tests/streaming/partition.rs
+++ b/core/integration/tests/streaming/partition.rs
@@ -18,13 +18,13 @@
 
 use crate::streaming::common::test_setup::TestSetup;
 use crate::streaming::create_messages;
+use compio::fs;
 use iggy::prelude::{IggyExpiry, IggyTimestamp, Sizeable};
 use server::state::system::PartitionState;
 use server::streaming::partitions::partition::Partition;
 use server::streaming::segments::*;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, AtomicU64};
-use compio::fs;
 
 #[compio::test]
 async fn should_persist_partition_with_segment() {
diff --git a/core/integration/tests/streaming/segment.rs 
b/core/integration/tests/streaming/segment.rs
index 0ee68893..4e097b8f 100644
--- a/core/integration/tests/streaming/segment.rs
+++ b/core/integration/tests/streaming/segment.rs
@@ -347,7 +347,7 @@ async fn should_delete_persisted_segments() -> Result<(), 
Box<dyn std::error::Er
     shard.add_active_session(session.clone());
 
     let id = shard
-        .create_stream_bypass_auth( Some(stream_id.get_u32_value()?), 
stream_name)
+        .create_stream_bypass_auth(Some(stream_id.get_u32_value()?), 
stream_name)
         .unwrap();
     let stream = shard.get_stream(&Identifier::numeric(id).unwrap()).unwrap();
     stream.persist();
diff --git a/core/integration/tests/streaming/stream.rs 
b/core/integration/tests/streaming/stream.rs
index ff360632..8d7b7cb8 100644
--- a/core/integration/tests/streaming/stream.rs
+++ b/core/integration/tests/streaming/stream.rs
@@ -19,12 +19,12 @@
 use crate::streaming::common::test_setup::TestSetup;
 use crate::streaming::create_messages;
 use ahash::AHashMap;
+use compio::fs;
 use iggy::prelude::*;
 use server::state::system::StreamState;
 use server::streaming::polling_consumer::PollingConsumer;
 use server::streaming::segments::IggyMessagesBatchMut;
 use server::streaming::streams::stream::Stream;
-use compio::fs;
 
 #[compio::test]
 async fn should_persist_stream_with_topics_directory_and_info_file() {
diff --git a/core/integration/tests/streaming/topic.rs 
b/core/integration/tests/streaming/topic.rs
index 0e8812f9..37c4560b 100644
--- a/core/integration/tests/streaming/topic.rs
+++ b/core/integration/tests/streaming/topic.rs
@@ -19,6 +19,7 @@
 use crate::streaming::common::test_setup::TestSetup;
 use crate::streaming::create_messages;
 use ahash::AHashMap;
+use compio::fs;
 use iggy::prelude::*;
 use server::state::system::{PartitionState, TopicState};
 use server::streaming::polling_consumer::PollingConsumer;
@@ -27,7 +28,6 @@ use server::streaming::topics::topic::Topic;
 use std::default::Default;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, AtomicU64};
-use compio::fs;
 
 #[compio::test]
 async fn should_persist_topics_with_partitions_directories_and_info_file() {
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index b171652d..fa7fc48f 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -26,7 +26,7 @@ license = "Apache-2.0"
 normal = ["tracing-appender"]
 
 [package.metadata.cargo-machete]
-ignored = ["rust-s3"]
+ignored = ["rusty-s3"]
 
 [features]
 default = ["mimalloc"]
@@ -94,7 +94,6 @@ prometheus-client = "0.23.1"
 quinn = { workspace = true }
 reqwest = { workspace = true, features = ["rustls-tls-no-provider"] }
 ring = "0.17.14"
-rust-s3 = { workspace = true }
 rustls = { workspace = true }
 rustls-pemfile = "2.2.0"
 serde = { workspace = true }
@@ -107,6 +106,7 @@ tempfile = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
 compio = { workspace = true }
+cyper = { workspace = true }
 tokio-rustls = { workspace = true }
 tokio-util = { workspace = true }
 toml = { workspace = true }
@@ -118,6 +118,7 @@ tracing-subscriber = { workspace = true }
 twox-hash = { workspace = true }
 ulid = "1.2.1"
 uuid = { workspace = true }
+rusty-s3 = "0.7.0"
 
 [build-dependencies]
 figment = { version = "0.10.19", features = ["json", "toml", "env"] }
diff --git a/core/server/src/archiver/disk.rs b/core/server/src/archiver/disk.rs
index 0eafa20d..8873fef1 100644
--- a/core/server/src/archiver/disk.rs
+++ b/core/server/src/archiver/disk.rs
@@ -19,10 +19,12 @@
 use crate::archiver::{Archiver, COMPONENT};
 use crate::configs::server::DiskArchiverConfig;
 use crate::server_error::ArchiverError;
+use crate::streaming::utils::file;
+use compio::fs;
+use compio::io::copy;
 use error_set::ErrContext;
 use std::path::Path;
-use tokio::fs;
-use tracing::{debug, info};
+use tracing::{debug, error, info};
 
 #[derive(Debug)]
 pub struct DiskArchiver {
@@ -70,8 +72,7 @@ impl Archiver for DiskArchiver {
         files: &[&str],
         base_directory: Option<String>,
     ) -> Result<(), ArchiverError> {
-        //TODO: Fixme figure this out, we can't use tokio methods there.
-        /* debug!("Archiving files on disk: {:?}", files);
+        debug!("Archiving files on disk: {:?}", files);
         for file in files {
             debug!("Archiving file: {file}");
             let source = Path::new(file);
@@ -89,12 +90,15 @@ impl Archiver for DiskArchiver {
                 .with_error_context(|error| {
                     format!("{COMPONENT} (error: {error}) - failed to create 
file: {file} at path: {destination_path}",)
                 })?;
-            fs::copy(source, destination).await.with_error_context(|error| {
+            let source = file::open(file).await.unwrap();
+            let mut source = std::io::Cursor::new(source);
+            let (destination, _) = 
file::append(&destination_path).await.unwrap();
+            let mut destination = std::io::Cursor::new(destination);
+            copy(&mut source, &mut 
destination).await.with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to copy file: 
{file} to destination: {destination_path}")
             })?;
             debug!("Archived file: {file} at: {destination_path}");
         }
-        */
         Ok(())
     }
 }
diff --git a/core/server/src/archiver/mod.rs b/core/server/src/archiver/mod.rs
index 6d61dcad..6a6f6f50 100644
--- a/core/server/src/archiver/mod.rs
+++ b/core/server/src/archiver/mod.rs
@@ -30,6 +30,11 @@ use std::str::FromStr;
 use crate::archiver::disk::DiskArchiver;
 use crate::archiver::s3::S3Archiver;
 
+pub(crate) struct PutObjectStreamResponse {
+    status: u16,
+    total_size: usize,
+}
+pub(crate) type Region = String;
 pub const COMPONENT: &str = "ARCHIVER";
 
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Default, Display, Copy, 
Clone)]
diff --git a/core/server/src/archiver/s3.rs b/core/server/src/archiver/s3.rs
index bbc9cd3b..4b97a614 100644
--- a/core/server/src/archiver/s3.rs
+++ b/core/server/src/archiver/s3.rs
@@ -16,23 +16,39 @@
  * under the License.
  */
 
-use crate::archiver::{Archiver, COMPONENT};
+use crate::archiver::{Archiver, COMPONENT, PutObjectStreamResponse, Region};
 use crate::configs::server::S3ArchiverConfig;
+use crate::io;
 use crate::server_error::ArchiverError;
-use crate::streaming::utils::file;
+use crate::streaming::utils::{PooledBuffer, file};
+use bytes::BytesMut;
+use compio::buf::{IntoInner, IoBuf};
+use compio::fs;
+use compio::io::{AsyncRead, AsyncReadExt, copy};
+use cyper::{Client, ClientBuilder};
 use error_set::ErrContext;
-use s3::creds::Credentials;
-use s3::{Bucket, Region};
+use futures::future::try_join_all;
+use rusty_s3::actions::{CompleteMultipartUpload, CreateMultipartUpload, 
GetObject, UploadPart};
+use rusty_s3::{Bucket, Credentials, S3Action, UrlStyle};
+use std::collections::HashMap;
+use std::io::Cursor;
+use std::ops::Deref;
 use std::path::Path;
-use tokio::fs;
+use std::sync::Arc;
+use std::time::Duration;
 use tracing::{debug, error, info};
 
 #[derive(Debug)]
 pub struct S3Archiver {
+    client: Client,
     bucket: Bucket,
+    credentials: Credentials,
     tmp_upload_dir: String,
+    expiration: Duration,
 }
 
+pub const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);
+
 impl S3Archiver {
     /// Creates a new S3 archiver.
     ///
@@ -40,26 +56,25 @@ impl S3Archiver {
     ///
     /// Returns an error if the S3 client cannot be initialized or credentials 
are invalid.
     pub fn new(config: S3ArchiverConfig) -> Result<Self, ArchiverError> {
-        let credentials = Credentials::new(
-            Some(&config.key_id),
-            Some(&config.key_secret),
-            None,
-            None,
-            None,
-        )
-        .map_err(|_| ArchiverError::InvalidS3Credentials)?;
-
-        let bucket = Bucket::new(
-            &config.bucket,
-            Region::Custom {
-                endpoint: config.endpoint.map_or_else(String::new, |e| e),
-                region: config.region.map_or_else(String::new, |r| r),
-            },
-            credentials,
-        )
-        .map_err(|_| ArchiverError::CannotInitializeS3Archiver)?;
+        let credentials = Credentials::new(&config.key_id, &config.key_secret);
+        let region: Region = config.region.map_or_else(String::new, |r| r);
+        let endpoint = config
+            .endpoint
+            .map_or_else(String::new, |e| e)
+            .parse()
+            .expect("Endpoint should be valid URL");
+        let path_style = UrlStyle::VirtualHost;
+        let name = config.bucket;
+
+        let bucket = Bucket::new(endpoint, path_style, name, region)?;
+        //TODO: Make this configurable ?
+        let expiration = Duration::from_secs(60);
+        let client = ClientBuilder::new().use_rustls_default().build();
         Ok(Self {
-            bucket: *bucket,
+            bucket,
+            client,
+            credentials,
+            expiration,
             tmp_upload_dir: config.tmp_upload_dir,
         })
     }
@@ -69,9 +84,10 @@ impl S3Archiver {
             "Copying file: {path} to temporary S3 upload directory: {}",
             self.tmp_upload_dir
         );
-        let source = Path::new(path);
+        let source = file::open(&path).await?;
+        let mut source = std::io::Cursor::new(source);
         let destination = Path::new(&self.tmp_upload_dir).join(path);
-        let destination_path = 
destination.to_str().unwrap_or_default().to_owned();
+        let destination_path = self.tmp_upload_dir.to_owned();
         debug!("Creating temporary S3 upload directory: {destination_path}");
         fs::create_dir_all(destination.parent().expect("Path should have a 
parent directory"))
             .await
@@ -80,18 +96,123 @@ impl S3Archiver {
                     "{COMPONENT} (error: {error}) - failed to create temporary 
S3 upload directory for path: {destination_path}"
                 )
             })?;
+        let destination = file::open(&self.tmp_upload_dir).await?;
+        let mut destination = std::io::Cursor::new(destination);
         debug!("Copying file: {path} to temporary S3 upload path: 
{destination_path}");
-        fs::copy(source, &destination).await.with_error_context(|error| {
+        copy(&mut source, &mut destination).await.with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to copy file: 
{path} to temporary S3 upload path: {destination_path}")
         })?;
         debug!("File: {path} copied to temporary S3 upload path: 
{destination_path}");
         Ok(destination_path)
     }
+
+    async fn put_object_stream(
+        &self,
+        reader: &mut impl AsyncReadExt,
+        destination_path: &str,
+    ) -> Result<PutObjectStreamResponse, ArchiverError> {
+        let buf = BytesMut::with_capacity(CHUNK_SIZE);
+        let (result, chunk) = 
reader.read_exact(buf.slice(..CHUNK_SIZE)).await.into();
+        result?;
+        let buf = chunk.into_inner().freeze();
+        if buf.len() < CHUNK_SIZE {
+            // Normal upload
+            let action = self
+                .bucket
+                .put_object(Some(&self.credentials), destination_path);
+            let url = action.sign(self.expiration);
+            let buf_len = buf.len();
+            let response = self.client.put(url)?.body(buf).send().await?;
+            return Ok(PutObjectStreamResponse {
+                status: response.status().as_u16(),
+                total_size: buf_len,
+            });
+        }
+
+        let action = self
+            .bucket
+            .create_multipart_upload(Some(&self.credentials), 
destination_path);
+        let url = action.sign(self.expiration);
+        let response = self.client.post(url)?.send().await?;
+        let body = response.text().await?;
+        let response =
+            CreateMultipartUpload::parse_response(&body).expect("Failed to 
parse response");
+        let upload_id = response.upload_id();
+
+        let mut total_size = 0;
+        let mut part_number = 0;
+        let mut handles = Vec::new();
+        let action = self.bucket.upload_part(
+            Some(&self.credentials),
+            destination_path,
+            part_number,
+            upload_id,
+        );
+        let url = action.sign(self.expiration);
+        let buf_size = buf.len();
+        let response = self.client.put(url)?.body(buf).send();
+        total_size += buf_size;
+        part_number += 1;
+        handles.push(response);
+        loop {
+            let buf = BytesMut::with_capacity(CHUNK_SIZE);
+            let (result, chunk) = 
reader.read_exact(buf.slice(..CHUNK_SIZE)).await.into();
+            result?;
+            let buf = chunk.into_inner().freeze();
+            let buf_len = buf.len();
+            let done = buf_len < CHUNK_SIZE;
+            let action = self.bucket.upload_part(
+                Some(&self.credentials),
+                destination_path,
+                part_number,
+                upload_id,
+            );
+            let url = action.sign(self.expiration);
+            let response = self.client.put(url)?.body(buf).send();
+            part_number += 1;
+            total_size += buf_len;
+
+            handles.push(response);
+            if done {
+                break;
+            }
+        }
+        let responses = try_join_all(handles).await?;
+        let mut etags = Vec::new();
+        for response in responses {
+            let status_code = response.status().as_u16();
+            if status_code == 200 {
+                let etag = response
+                    .headers()
+                    .get("etag")
+                    .unwrap()
+                    .to_str()
+                    .unwrap()
+                    .to_owned();
+                etags.push(etag);
+            }
+        }
+        let action = self.bucket.complete_multipart_upload(
+            Some(&self.credentials),
+            destination_path,
+            upload_id,
+            etags.iter().map(|s| s.as_str()),
+        );
+        let url = action.sign(self.expiration);
+        let body = CompleteMultipartUpload::body(action);
+        let response = self.client.post(url)?.body(body).send().await?;
+        Ok(PutObjectStreamResponse {
+            status: response.status().as_u16(),
+            total_size,
+        })
+    }
 }
 
 impl Archiver for S3Archiver {
     async fn init(&self) -> Result<(), ArchiverError> {
-        let response = self.bucket.list("/".to_string(), None).await;
+        let action = self.bucket.list_objects_v2(Some(&self.credentials));
+        let url = action.sign(self.expiration);
+        let response = self.client.get(url)?.send().await;
         if let Err(error) = response {
             error!("Cannot initialize S3 archiver: {error}");
             return Err(ArchiverError::CannotInitializeS3Archiver);
@@ -102,7 +223,7 @@ impl Archiver for S3Archiver {
                 "Removing existing S3 archiver temporary upload directory: {}",
                 self.tmp_upload_dir
             );
-            fs::remove_dir_all(&self.tmp_upload_dir).await?;
+            io::fs_utils::remove_dir_all(&self.tmp_upload_dir).await?;
         }
         info!(
             "Creating S3 archiver temporary upload directory: {}",
@@ -121,14 +242,19 @@ impl Archiver for S3Archiver {
         let base_directory = base_directory.as_deref().unwrap_or_default();
         let destination = Path::new(&base_directory).join(file);
         let destination_path = 
destination.to_str().unwrap_or_default().to_owned();
-        let response = self.bucket.get_object_tagging(destination_path).await;
+        let mut object_tagging = self
+            .bucket
+            .get_object(Some(&self.credentials), &destination_path);
+        object_tagging.query_mut().insert("tagging", "");
+        let url = object_tagging.sign(self.expiration);
+        let response = self.client.get(url)?.send().await;
         if response.is_err() {
             debug!("File: {file} is not archived on S3.");
             return Ok(false);
         }
 
-        let (_, status) = response.expect("Response should be valid if not an 
error");
-        if status == 200 {
+        let response = response.expect("Response should be valid if not an 
error");
+        if response.status() == 200 {
             debug!("File: {file} is archived on S3.");
             return Ok(true);
         }
@@ -151,19 +277,15 @@ impl Archiver for S3Archiver {
 
             let source = self.copy_file_to_tmp(path).await?;
             debug!("Archiving file: {source} on S3.");
-            let mut file = file::open(&source)
+            let file = file::open(&source)
                 .await
                 .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to open source file: {source} for archiving"))?;
+            let mut reader = std::io::Cursor::new(file);
             let base_directory = base_directory.as_deref().unwrap_or_default();
             let destination = Path::new(&base_directory).join(path);
             let destination_path = 
destination.to_str().unwrap_or_default().to_owned();
-            // TODO: Fixme figure this out.
-            // The `put_object_stream` method requires `AsyncRead` trait from 
tokio as its reader.
-            /*
-            let response = self
-                .bucket
-                .put_object_stream(&mut file, destination_path)
-                .await;
+            // Egh.. multi part upload.
+            let response = self.put_object_stream(&mut reader, 
&destination_path).await;
             if let Err(error) = response {
                 error!("Cannot archive file: {path} on S3: {}", error);
                 fs::remove_file(&source).await.with_error_context(|error| {
@@ -173,9 +295,7 @@ impl Archiver for S3Archiver {
                     file_path: (*path).to_string(),
                 });
             }
-
-            let response = response.expect("Response should be valid if not an 
error");
-            let status = response.status_code();
+            let PutObjectStreamResponse { status, total_size } = 
response.unwrap();
             if status == 200 {
                 debug!("Archived file: {path} on S3.");
                 fs::remove_file(&source).await.with_error_context(|error| {
@@ -191,7 +311,6 @@ impl Archiver for S3Archiver {
             return Err(ArchiverError::CannotArchiveFile {
                 file_path: (*path).to_string(),
             });
-            */
         }
         Ok(())
     }
diff --git a/core/server/src/channels/commands/mod.rs 
b/core/server/src/channels/commands/mod.rs
index 464277bb..13b1bf91 100644
--- a/core/server/src/channels/commands/mod.rs
+++ b/core/server/src/channels/commands/mod.rs
@@ -18,7 +18,6 @@
 
 pub mod archive_state;
 pub mod clean_personal_access_tokens;
-pub mod maintain_messages;
 pub mod print_sysinfo;
 pub mod save_messages;
 pub mod verify_heartbeats;
diff --git a/core/server/src/channels/handler.rs 
b/core/server/src/channels/handler.rs
index 7612a259..ee44fd58 100644
--- a/core/server/src/channels/handler.rs
+++ b/core/server/src/channels/handler.rs
@@ -1,3 +1,5 @@
+use std::rc::Rc;
+
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,21 +17,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-//TODO: Fixme
-/*
 use super::server_command::BackgroundServerCommand;
-use crate::configs::server::ServerConfig;
-use crate::streaming::systems::system::SharedSystem;
+use crate::{configs::server::ServerConfig, shard::IggyShard};
 
 pub struct BackgroundServerCommandHandler<'a> {
-    system: SharedSystem,
+    shard: Rc<IggyShard>,
     config: &'a ServerConfig,
 }
 
 impl<'a> BackgroundServerCommandHandler<'a> {
-    pub fn new(system: SharedSystem, config: &'a ServerConfig) -> Self {
-        Self { system, config }
+    pub fn new(shard: Rc<IggyShard>, config: &'a ServerConfig) -> Self {
+        Self { shard, config }
     }
 
     pub fn install_handler<C, E>(&mut self, mut executor: E) -> Self
@@ -37,14 +35,12 @@ impl<'a> BackgroundServerCommandHandler<'a> {
         E: BackgroundServerCommand<C> + Send + Sync + 'static,
     {
         let (sender, receiver) = flume::unbounded();
-        let system = self.system.clone();
-        executor.start_command_sender(system.clone(), self.config, sender);
-        executor.start_command_consumer(system.clone(), self.config, receiver);
+        let shard = self.shard.clone();
+        executor.start_command_sender(shard.clone(), self.config, sender);
+        executor.start_command_consumer(shard.clone(), self.config, receiver);
         Self {
-            system,
+            shard,
             config: self.config,
         }
     }
 }
-
-*/
diff --git a/core/server/src/channels/server_command.rs 
b/core/server/src/channels/server_command.rs
index 16b0dc2f..ac490737 100644
--- a/core/server/src/channels/server_command.rs
+++ b/core/server/src/channels/server_command.rs
@@ -16,29 +16,24 @@
  * under the License.
  */
 
-//TODO: Fixme
-/*
-use crate::configs::server::ServerConfig;
-use crate::streaming::systems::system::SharedSystem;
+use crate::{configs::server::ServerConfig, shard::IggyShard};
 use flume::{Receiver, Sender};
-use std::future::Future;
+use std::{future::Future, rc::Rc};
 
 pub trait BackgroundServerCommand<C> {
-    fn execute(&mut self, system: &SharedSystem, command: C) -> impl 
Future<Output = ()>;
+    fn execute(&mut self, system: &IggyShard, command: C) -> impl 
Future<Output = ()>;
 
     fn start_command_sender(
         &mut self,
-        system: SharedSystem,
+        shard: Rc<IggyShard>,
         config: &ServerConfig,
         sender: Sender<C>,
     );
 
     fn start_command_consumer(
         self,
-        system: SharedSystem,
+        shard: Rc<IggyShard>,
         config: &ServerConfig,
         receiver: Receiver<C>,
     );
 }
-
-*/
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 2971347e..b1481c22 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -19,6 +19,7 @@
 use std::collections::HashSet;
 use std::rc::Rc;
 use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
 
 use anyhow::Result;
 use clap::Parser;
@@ -28,6 +29,7 @@ use figlet_rs::FIGfont;
 use iggy_common::create_user::CreateUser;
 use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
+use server::archiver::{ArchiverKind, ArchiverKindType};
 use server::args::Args;
 use server::bootstrap::{
     create_directories, create_root_user, create_shard_connections, 
create_shard_executor,
@@ -172,6 +174,30 @@ fn main() -> Result<(), ServerError> {
                         )),
                         false => None,
                     };
+                    let archiver_config = &config.data_maintenance.archiver;
+                    let archiver: Option<ArchiverKind> = if 
archiver_config.enabled {
+                        info!("Archiving is enabled, kind: {}", 
archiver_config.kind);
+                        match archiver_config.kind {
+                            ArchiverKindType::Disk => 
Some(ArchiverKind::get_disk_archiver(
+                                archiver_config
+                                    .disk
+                                    .clone()
+                                    .expect("Disk archiver config is missing"),
+                            )),
+                            ArchiverKindType::S3 => Some(
+                                ArchiverKind::get_s3_archiver(
+                                    archiver_config
+                                        .s3
+                                        .clone()
+                                        .expect("S3 archiver config is 
missing"),
+                                )
+                                .expect("Failed to create S3 archiver"),
+                            ),
+                        }
+                    } else {
+                        info!("Archiving is disabled.");
+                        None
+                    };
 
                     let state = StateKind::File(FileState::new(
                         &config.system.get_state_messages_file_path(),
@@ -251,6 +277,7 @@ fn main() -> Result<(), ServerError> {
                         .id(id)
                         .connections(connections)
                         .config(config)
+                        .archiver(archiver)
                         .encryptor(encryptor)
                         .version(version)
                         .state(state)
diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs
index 16e2ef92..ca29a627 100644
--- a/core/server/src/server_error.rs
+++ b/core/server/src/server_error.rs
@@ -18,6 +18,7 @@
 
 use error_set::error_set;
 use quinn::{ConnectionError as QuicConnectionError, ReadToEndError, 
WriteError};
+use rusty_s3::BucketError;
 use std::array::TryFromSliceError;
 use tokio::io;
 
@@ -45,6 +46,7 @@ error_set!(
         #[display("Invalid configuration")]
         InvalidConfiguration,
 
+
         #[display("Cache config validation failure")]
         CacheConfigValidationFailure,
     };
@@ -59,6 +61,12 @@ error_set!(
         #[display("Invalid S3 credentials")]
         InvalidS3Credentials,
 
+        #[display("HTTP request error: {0}")]
+        CyperError(cyper::Error),
+
+        #[display("Invalid S3 Bucket configuration")]
+        BucketError(BucketError),
+
         #[display("Cannot archive file: {}", file_path)]
         CannotArchiveFile { file_path: String },
     } || IoError;
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index eb851271..c49cdeb6 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -26,6 +26,7 @@ use iggy_common::{Aes256GcmEncryptor, EncryptorKind};
 use tracing::info;
 
 use crate::{
+    archiver::{Archiver, ArchiverKind},
     bootstrap::resolve_persister,
     configs::server::ServerConfig,
     map_toggle_str,
@@ -44,6 +45,7 @@ pub struct IggyShardBuilder {
     config: Option<ServerConfig>,
     encryptor: Option<EncryptorKind>,
     version: Option<SemanticVersion>,
+    archiver: Option<ArchiverKind>,
     state: Option<StateKind>,
 }
 
@@ -78,6 +80,11 @@ impl IggyShardBuilder {
         self
     }
 
+    pub fn archiver(mut self, archiver: Option<ArchiverKind>) -> Self {
+        self.archiver = archiver;
+        self
+    }
+
     // TODO: Too much happens in there, some of those bootstrapping logic 
should be moved outside.
     pub fn build(self) -> IggyShard {
         let id = self.id.unwrap();
@@ -105,6 +112,7 @@ impl IggyShardBuilder {
             config.system.clone(),
             partition_persister,
         ));
+        let archiver = self.archiver.map(Rc::new);
 
         IggyShard {
             id: id,
@@ -112,6 +120,7 @@ impl IggyShardBuilder {
             shards_table: Default::default(),
             storage: storage,
             encryptor: encryptor,
+            archiver: archiver,
             state: state,
             config: config,
             version: version,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index b884a92d..43280c77 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -50,12 +50,16 @@ use tracing::{error, info, instrument, warn};
 use transmission::connector::{Receiver, ShardConnector, StopReceiver, 
StopSender};
 
 use crate::{
+    archiver::ArchiverKind,
     configs::server::ServerConfig,
     io::fs_utils,
     shard::{
         system::info::SystemInfo,
         task_registry::TaskRegistry,
-        tasks::messages::spawn_shard_message_task,
+        tasks::{
+            auxilary::maintain_messages::spawn_message_maintainance_task,
+            messages::spawn_shard_message_task,
+        },
         transmission::{
             event::ShardEvent,
             frame::{ShardFrame, ShardResponse},
@@ -141,6 +145,7 @@ pub struct IggyShard {
 
     pub(crate) state: StateKind,
     pub(crate) encryptor: Option<EncryptorKind>,
+    pub(crate) archiver: Option<Rc<ArchiverKind>>,
     pub(crate) config: ServerConfig,
     //TODO: This could be shared.
     pub(crate) client_manager: RefCell<ClientManager>,
@@ -182,8 +187,8 @@ impl IggyShard {
         let (stop_sender, stop_receiver) = async_channel::unbounded();
 
         let shard = Self {
-            id: 0,              // Default shard ID
-            shards: Vec::new(), // No other shards in default config
+            id: 0,
+            shards: Vec::new(),
             shards_table: Default::default(),
             version,
             streams: Default::default(),
@@ -191,6 +196,7 @@ impl IggyShard {
             storage,
             state,
             encryptor: None,
+            archiver: None,
             config: server_config,
             client_manager: Default::default(),
             active_sessions: Default::default(),
@@ -223,15 +229,12 @@ impl IggyShard {
         let _ = self.load_users(users.into_values().collect()).await;
         let _ = self.load_streams(streams.into_values().collect()).await;
 
-        //TODO: Fix the archiver.
-        /*
         if let Some(archiver) = self.archiver.as_ref() {
             archiver
                 .init()
                 .await
                 .expect("Failed to initialize archiver");
         }
-        */
         info!("Initialized system in {} ms.", now.elapsed().as_millis());
         Ok(())
     }
@@ -248,6 +251,7 @@ impl IggyShard {
 
         // Create all tasks (tcp listener, http listener, command processor, 
in the future also the background jobs).
         let mut tasks: Vec<Task> = 
vec![Box::pin(spawn_shard_message_task(self.clone()))];
+        tasks.push(Box::pin(spawn_message_maintainance_task(self.clone())));
         if self.config.tcp.enabled {
             tasks.push(Box::pin(spawn_tcp_server(self.clone())));
         }
diff --git a/core/server/src/channels/commands/maintain_messages.rs 
b/core/server/src/shard/tasks/auxilary/maintain_messages.rs
similarity index 59%
rename from core/server/src/channels/commands/maintain_messages.rs
rename to core/server/src/shard/tasks/auxilary/maintain_messages.rs
index 77a60999..e3edea69 100644
--- a/core/server/src/channels/commands/maintain_messages.rs
+++ b/core/server/src/shard/tasks/auxilary/maintain_messages.rs
@@ -1,222 +1,124 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-//TODO: Fixme
-/*
-
-use crate::archiver::ArchiverKind;
-use crate::channels::server_command::BackgroundServerCommand;
-use crate::configs::server::MessagesMaintenanceConfig;
-use crate::map_toggle_str;
-use crate::streaming::systems::system::SharedSystem;
-use crate::streaming::topics::topic::Topic;
+use crate::{
+    archiver::ArchiverKind, configs::server::MessagesMaintenanceConfig, 
map_toggle_str,
+    shard::IggyShard, streaming::topics::topic::Topic,
+};
+use compio::time;
 use error_set::ErrContext;
-use flume::Sender;
-use iggy_common::IggyDuration;
-use iggy_common::IggyError;
-use iggy_common::IggyTimestamp;
-use iggy_common::locking::IggyRwLockFn;
-use std::sync::Arc;
-use tokio::time;
-use tracing::{debug, error, info, instrument, trace};
-
-pub struct MessagesMaintainer {
-    cleaner_enabled: bool,
-    archiver_enabled: bool,
-    interval: IggyDuration,
-    sender: Sender<MaintainMessagesCommand>,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct MaintainMessagesCommand {
-    clean_messages: bool,
-    archive_messages: bool,
-}
-
-#[derive(Debug, Default, Clone)]
-pub struct MaintainMessagesExecutor;
-
-impl MessagesMaintainer {
-    pub fn new(
-        config: &MessagesMaintenanceConfig,
-        sender: Sender<MaintainMessagesCommand>,
-    ) -> Self {
-        Self {
-            cleaner_enabled: config.cleaner_enabled,
-            archiver_enabled: config.archiver_enabled,
-            interval: config.interval,
-            sender,
-        }
+use futures::FutureExt;
+use iggy_common::{IggyDuration, IggyError, IggyTimestamp, 
locking::IggyRwLockFn};
+use std::{rc::Rc, time::Duration};
+use tracing::{debug, error, info, trace};
+
+pub async fn spawn_message_maintainance_task(shard: Rc<IggyShard>) -> 
Result<(), IggyError> {
+    let config = &shard.config.data_maintenance.messages;
+    if !config.cleaner_enabled && !config.archiver_enabled {
+        info!("Messages maintainer is disabled.");
+        return Ok(());
     }
 
-    pub fn start(&self) {
-        if !self.cleaner_enabled && !self.archiver_enabled {
-            info!("Messages maintainer is disabled.");
-            return;
-        }
-
-        let interval = self.interval;
-        let sender = self.sender.clone();
-        info!(
-            "Message maintainer, cleaner is {}, archiver is {}, interval: 
{interval}",
-            map_toggle_str(self.cleaner_enabled),
-            map_toggle_str(self.archiver_enabled)
-        );
-        let clean_messages = self.cleaner_enabled;
-        let archive_messages = self.archiver_enabled;
-        tokio::spawn(async move {
-            let mut interval_timer = time::interval(interval.get_duration());
+    let interval = config.interval;
+    info!(
+        "Message maintainer, cleaner is {}, archiver is {}, interval: 
{interval}",
+        map_toggle_str(config.cleaner_enabled),
+        map_toggle_str(config.archiver_enabled)
+    );
+    let clean_messages = config.cleaner_enabled;
+    let mut interval_timer = time::interval(interval.get_duration());
+    loop {
+        let shutdown_check = async {
             loop {
-                interval_timer.tick().await;
-                sender
-                    .send(MaintainMessagesCommand {
-                        clean_messages,
-                        archive_messages,
-                    })
-                    .unwrap_or_else(|err| {
-                        error!("Failed to send MaintainMessagesCommand. Error: 
{}", err);
-                    });
-            }
-        });
-    }
-}
-
-impl BackgroundServerCommand<MaintainMessagesCommand> for 
MaintainMessagesExecutor {
-    #[instrument(skip_all, name = "trace_maintain_messages")]
-    async fn execute(&mut self, system: &SharedSystem, command: 
MaintainMessagesCommand) {
-        let system = system.read().await;
-        let streams = system.get_streams();
-        for stream in streams {
-            let topics = stream.get_topics();
-            for topic in topics {
-                let archiver = if command.archive_messages {
-                    system.archiver.clone()
-                } else {
-                    None
-                };
-                let expired_segments = handle_expired_segments(
-                    topic,
-                    archiver.clone(),
-                    system.config.segment.archive_expired,
-                    command.clean_messages,
-                )
-                .await;
-                if expired_segments.is_err() {
-                    error!(
-                        "Failed to get expired segments for stream ID: {}, 
topic ID: {}",
-                        topic.stream_id, topic.topic_id
-                    );
-                    continue;
+                if shard.is_shutting_down() {
+                    return;
                 }
-
-                let oldest_segments = handle_oldest_segments(
-                    topic,
-                    archiver.clone(),
-                    system.config.topic.delete_oldest_segments,
-                )
-                .await;
-                if oldest_segments.is_err() {
-                    error!(
-                        "Failed to get oldest segments for stream ID: {}, 
topic ID: {}",
-                        topic.stream_id, topic.topic_id
-                    );
-                    continue;
-                }
-
-                let deleted_expired_segments = expired_segments.unwrap();
-                let deleted_oldest_segments = oldest_segments.unwrap();
-                let deleted_segments = HandledSegments {
-                    segments_count: deleted_expired_segments.segments_count
-                        + deleted_oldest_segments.segments_count,
-                    messages_count: deleted_expired_segments.messages_count
-                        + deleted_oldest_segments.messages_count,
-                };
-
-                if deleted_segments.segments_count == 0 {
-                    trace!(
-                        "No segments were deleted for stream ID: {}, topic ID: 
{}",
-                        topic.stream_id, topic.topic_id
-                    );
-                    continue;
-                }
-
-                info!(
-                    "Deleted {} segments and {} messages for stream ID: {}, 
topic ID: {}",
-                    deleted_segments.segments_count,
-                    deleted_segments.messages_count,
-                    topic.stream_id,
-                    topic.topic_id
-                );
-
-                system
-                    .metrics
-                    .decrement_segments(deleted_segments.segments_count);
-                system
-                    .metrics
-                    .decrement_messages(deleted_segments.messages_count);
+                compio::time::sleep(Duration::from_millis(100)).await;
             }
-        }
-    }
+        };
+        let archiver = shard.archiver.clone();
+        let fut = interval_timer.tick();
+
+        futures::select! {
+            _ = shutdown_check.fuse() => {
+                info!("Shard {} message maintainer shutting down", shard.id);
+                break;
+            }
+            _ = fut.fuse() => {
+                let streams = shard.get_streams();
+                for stream in streams {
+                    let topics = stream.get_topics();
+                    for topic in topics {
+                        let expired_segments = handle_expired_segments(
+                            topic,
+                            archiver.clone(),
+                            shard.config.system.segment.archive_expired,
+                            clean_messages,
+                        )
+                        .await;
+                        if expired_segments.is_err() {
+                            error!(
+                                "Failed to get expired segments for stream ID: 
{}, topic ID: {}",
+                                topic.stream_id, topic.topic_id
+                            );
+                            continue;
+                        }
+
+                        let stream_id = stream.stream_id;
+                        let oldest_segments = handle_oldest_segments(
+                            stream_id,
+                            topic,
+                            archiver.clone(),
+                            shard.config.system.topic.delete_oldest_segments,
+                        )
+                        .await;
+                        if oldest_segments.is_err() {
+                            error!(
+                                "Failed to get oldest segments for stream ID: 
{}, topic ID: {}",
+                                topic.stream_id, topic.topic_id
+                            );
+                            continue;
+                        }
+
+                        let deleted_expired_segments = 
expired_segments.unwrap();
+                        let deleted_oldest_segments = oldest_segments.unwrap();
+                        let deleted_segments = HandledSegments {
+                            segments_count: 
deleted_expired_segments.segments_count
+                                + deleted_oldest_segments.segments_count,
+                            messages_count: 
deleted_expired_segments.messages_count
+                                + deleted_oldest_segments.messages_count,
+                        };
+
+                        if deleted_segments.segments_count == 0 {
+                            trace!(
+                                "No segments were deleted for stream ID: {}, 
topic ID: {}",
+                                topic.stream_id, topic.topic_id
+                            );
+                            continue;
+                        }
+
+                        info!(
+                            "Deleted {} segments and {} messages for stream 
ID: {}, topic ID: {}",
+                            deleted_segments.segments_count,
+                            deleted_segments.messages_count,
+                            topic.stream_id,
+                            topic.topic_id
+                        );
 
-    fn start_command_sender(
-        &mut self,
-        _system: SharedSystem,
-        config: &crate::configs::server::ServerConfig,
-        sender: Sender<MaintainMessagesCommand>,
-    ) {
-        if (!config.data_maintenance.archiver.enabled
-            || !config.data_maintenance.messages.archiver_enabled)
-            && !config.data_maintenance.messages.cleaner_enabled
-        {
-            return;
+                        shard
+                            .metrics
+                            
.decrement_segments(deleted_segments.segments_count);
+                        shard
+                            .metrics
+                            
.decrement_messages(deleted_segments.messages_count);
+                    }
+            }
         }
-
-        let messages_maintainer =
-            MessagesMaintainer::new(&config.data_maintenance.messages, sender);
-        messages_maintainer.start();
-    }
-
-    fn start_command_consumer(
-        mut self,
-        system: SharedSystem,
-        config: &crate::configs::server::ServerConfig,
-        receiver: flume::Receiver<MaintainMessagesCommand>,
-    ) {
-        if (!config.data_maintenance.archiver.enabled
-            || !config.data_maintenance.messages.archiver_enabled)
-            && !config.data_maintenance.messages.cleaner_enabled
-        {
-            return;
         }
-
-        tokio::spawn(async move {
-            let system = system.clone();
-            while let Ok(command) = receiver.recv_async().await {
-                self.execute(&system, command).await;
-            }
-            info!("Messages maintainer receiver stopped.");
-        });
     }
+    Ok(())
 }
 
 async fn handle_expired_segments(
     topic: &Topic,
-    archiver: Option<Arc<ArchiverKind>>,
+    archiver: Option<Rc<ArchiverKind>>,
     archive: bool,
     clean: bool,
 ) -> Result<HandledSegments, IggyError> {
@@ -287,8 +189,9 @@ async fn get_expired_segments(topic: &Topic, now: 
IggyTimestamp) -> Vec<Segments
 }
 
 async fn handle_oldest_segments(
+    stream_id: u32,
     topic: &Topic,
-    archiver: Option<Arc<ArchiverKind>>,
+    archiver: Option<Rc<ArchiverKind>>,
     delete_oldest_segments: bool,
 ) -> Result<HandledSegments, IggyError> {
     if let Some(archiver) = archiver {
@@ -296,11 +199,7 @@ async fn handle_oldest_segments(
         for partition in topic.partitions.values() {
             let mut start_offsets = Vec::new();
             let partition = partition.read().await;
-            for segment in partition.get_segments() {
-                if !segment.is_closed() {
-                    continue;
-                }
-
+            for segment in partition.get_segments().iter().filter(|s| 
s.is_closed()) {
                 let is_archived = 
archiver.is_archived(segment.index_file_path(), None).await;
                 if is_archived.is_err() {
                     error!(
@@ -325,6 +224,7 @@ async fn handle_oldest_segments(
                     start_offsets.push(segment.start_offset());
                 }
             }
+
             if !start_offsets.is_empty() {
                 info!(
                     "Found {} segments to archive for stream ID: {}, topic ID: 
{}, partition ID: {}",
@@ -340,11 +240,13 @@ async fn handle_oldest_segments(
             }
         }
 
+        let segments_count = segments_to_archive
+            .iter()
+            .map(|s| s.start_offsets.len())
+            .sum::<usize>();
         info!(
             "Archiving {} oldest segments for stream ID: {}, topic ID: {}...",
-            segments_to_archive.len(),
-            topic.stream_id,
-            topic.topic_id,
+            segments_count, topic.stream_id, topic.topic_id,
         );
         archive_segments(topic, &segments_to_archive, archiver.clone())
             .await
@@ -446,17 +348,19 @@ impl HandledSegments {
 async fn archive_segments(
     topic: &Topic,
     segments_to_archive: &[SegmentsToHandle],
-    archiver: Arc<ArchiverKind>,
+    archiver: Rc<ArchiverKind>,
 ) -> Result<u64, IggyError> {
     if segments_to_archive.is_empty() {
         return Ok(0);
     }
 
+    let segments_count = segments_to_archive
+        .iter()
+        .map(|s| s.start_offsets.len())
+        .sum::<usize>();
     info!(
         "Found {} segments to archive for stream ID: {}, topic ID: {}, 
archiving...",
-        segments_to_archive.len(),
-        topic.stream_id,
-        topic.topic_id
+        segments_count, topic.stream_id, topic.topic_id
     );
 
     let mut archived_segments = 0;
@@ -556,5 +460,3 @@ async fn delete_segments(
         messages_count,
     })
 }
-
-*/
diff --git a/core/server/src/shard/tasks/auxilary/mod.rs 
b/core/server/src/shard/tasks/auxilary/mod.rs
new file mode 100644
index 00000000..944cab55
--- /dev/null
+++ b/core/server/src/shard/tasks/auxilary/mod.rs
@@ -0,0 +1 @@
+pub mod maintain_messages;
diff --git a/core/server/src/shard/tasks/mod.rs 
b/core/server/src/shard/tasks/mod.rs
index ba63992f..a5ec47da 100644
--- a/core/server/src/shard/tasks/mod.rs
+++ b/core/server/src/shard/tasks/mod.rs
@@ -1 +1,2 @@
+pub mod auxilary;
 pub mod messages;
diff --git a/core/server/src/streaming/partitions/persistence.rs 
b/core/server/src/streaming/partitions/persistence.rs
index 824bbddf..e4ee2d9e 100644
--- a/core/server/src/streaming/partitions/persistence.rs
+++ b/core/server/src/streaming/partitions/persistence.rs
@@ -19,11 +19,11 @@
 use crate::state::system::PartitionState;
 use crate::streaming::partitions::COMPONENT;
 use crate::streaming::partitions::partition::Partition;
+use compio::fs::create_dir_all;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use std::path::Path;
 use std::sync::atomic::Ordering;
-use compio::fs::create_dir_all;
 use tracing::error;
 
 impl Partition {
diff --git a/core/server/src/streaming/segments/writing_messages.rs 
b/core/server/src/streaming/segments/writing_messages.rs
index df177bdf..e2b0fbb9 100644
--- a/core/server/src/streaming/segments/writing_messages.rs
+++ b/core/server/src/streaming/segments/writing_messages.rs
@@ -159,7 +159,9 @@ impl Segment {
             }
             self.shutdown_writing().await;
             info!(
-                "Closed segment with start offset: {}, end offset: {}, size: 
{} for partition with ID: {}.",
+                "Closed segment for stream: {}, topic: {} with start offset: 
{}, end offset: {}, size: {} for partition with ID: {}.",
+                self.stream_id,
+                self.topic_id,
                 self.start_offset,
                 self.end_offset,
                 self.get_messages_size(),
diff --git a/core/server/src/streaming/utils/file.rs 
b/core/server/src/streaming/utils/file.rs
index de00aa5e..6f70363c 100644
--- a/core/server/src/streaming/utils/file.rs
+++ b/core/server/src/streaming/utils/file.rs
@@ -19,10 +19,6 @@
 use compio::fs::{File, OpenOptions, remove_file};
 use std::path::Path;
 
-pub fn open_std(path: &str) -> Result<std::fs::File, std::io::Error> {
-    std::fs::OpenOptions::new().read(true).open(path)
-}
-
 pub async fn open(path: &str) -> Result<File, std::io::Error> {
     OpenOptions::new().read(true).open(path).await
 }

Reply via email to