This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new 6cbf8c4a feat: update disk cache in another thread to avoid blocking
normal query process (#1431)
6cbf8c4a is described below
commit 6cbf8c4a18502462bd85f14a692d161446a1ac47
Author: Jiacai Liu <[email protected]>
AuthorDate: Tue Jan 9 16:46:19 2024 +0800
feat: update disk cache in another thread to avoid blocking normal query
process (#1431)
## Rationale
When there is a cache miss in disk cache, it will
1. Fetch data from remote
2. Insert data to cache, which will incur disk IO
3. Return the data for query.
We can move the second step to another thread to avoid it blocking the
normal query process.
## Detailed Changes
- Make write disk nonblocking
- Block on test explicitly, otherwise it will throw errors below
> Cannot drop a runtime in a context where blocking is not allowed. This
happens when a runtime is dropped from within an asynchronous context.
## Test Plan
CI
---
analytic_engine/src/setup.rs | 1 +
components/object_store/src/disk_cache.rs | 699 +++++++++++++++++-------------
2 files changed, 388 insertions(+), 312 deletions(-)
diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs
index 371fadc3..1c55c925 100644
--- a/analytic_engine/src/setup.rs
+++ b/analytic_engine/src/setup.rs
@@ -241,6 +241,7 @@ fn open_storage(
opts.disk_cache_page_size.as_byte() as usize,
store,
opts.disk_cache_partition_bits,
+ engine_runtimes.io_runtime.clone(),
)
.await
.context(OpenObjectStore)?,
diff --git a/components/object_store/src/disk_cache.rs
b/components/object_store/src/disk_cache.rs
index 53d537ff..937181a6 100644
--- a/components/object_store/src/disk_cache.rs
+++ b/components/object_store/src/disk_cache.rs
@@ -31,6 +31,7 @@ use logger::{debug, warn};
use lru::LruCache;
use notifier::notifier::{ExecutionGuard, RequestNotifiers};
use partitioned_lock::PartitionedMutex;
+use runtime::RuntimeRef;
use serde::{Deserialize, Serialize};
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use time_ext;
@@ -261,10 +262,10 @@ struct PageMeta {
// TODO: Introduce the CRC for integration check.
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
struct DiskCache {
root_dir: String,
- meta_cache: PartitionedMutex<PageMetaCache, SeaHasherBuilder>,
+ meta_cache: Arc<PartitionedMutex<PageMetaCache, SeaHasherBuilder>>,
}
#[derive(Debug, Clone)]
@@ -292,7 +293,11 @@ impl DiskCache {
Ok(Self {
root_dir,
- meta_cache: PartitionedMutex::try_new(init_lru, partition_bits,
SeaHasherBuilder {})?,
+ meta_cache: Arc::new(PartitionedMutex::try_new(
+ init_lru,
+ partition_bits,
+ SeaHasherBuilder {},
+ )?),
})
}
@@ -509,6 +514,7 @@ pub struct DiskCacheStore {
meta_cache: PartitionedMutex<LruCache<Path, FileMeta>, SeaHasherBuilder>,
underlying_store: Arc<dyn ObjectStore>,
request_notifiers: Arc<RequestNotifiers<String,
oneshot::Sender<StdResult<Bytes, Error>>>>,
+ runtime: RuntimeRef,
}
impl DiskCacheStore {
@@ -518,6 +524,7 @@ impl DiskCacheStore {
page_size: usize,
underlying_store: Arc<dyn ObjectStore>,
partition_bits: usize,
+ runtime: RuntimeRef,
) -> Result<Self> {
let page_num = cap / page_size;
ensure!(page_num != 0, InvalidCapacity);
@@ -550,6 +557,7 @@ impl DiskCacheStore {
meta_cache,
underlying_store,
request_notifiers,
+ runtime,
})
}
@@ -744,7 +752,17 @@ impl DiskCacheStore {
.zip(notifiers_vec.into_iter())
.zip(need_fetch_block_cache_key.into_iter())
{
- self.cache.insert_data(cache_key, bytes.clone()).await;
+ {
+ let cache = self.cache.clone();
+ let bytes = bytes.clone();
+ let handle = self
+ .runtime
+ .spawn(async move { cache.insert_data(cache_key,
bytes).await });
+ // In test, wait the handle to finish, otherwise the test may
fail.
+ if cfg!(test) {
+ let _ = handle.await;
+ }
+ }
for notifier in notifiers {
if notifier.send(Ok(bytes.clone())).is_err() {
// The error contains sent bytes, which maybe very large,
@@ -992,6 +1010,7 @@ impl ObjectStore for DiskCacheStore {
#[cfg(test)]
mod test {
+ use runtime::{Builder, RuntimeRef};
use tempfile::{tempdir, TempDir};
use upstream::local::LocalFileSystem;
@@ -1007,6 +1026,7 @@ mod test {
page_size: usize,
cap: usize,
partition_bits: usize,
+ runtime: RuntimeRef,
) -> StoreWithCacheDir {
let local_store = Arc::new(MemoryStore::default());
@@ -1017,6 +1037,7 @@ mod test {
page_size,
local_store,
partition_bits,
+ runtime,
)
.await
.unwrap();
@@ -1034,341 +1055,392 @@ mod test {
.exists()
}
- #[tokio::test]
- async fn test_disk_cache_out_of_range() {
- let page_size = 16;
- // 51 byte
- let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
- let location = Path::from("out_of_range_test.sst");
- let store = prepare_store(page_size, 32, 0).await;
- let buf = Bytes::from_static(data);
- store.inner.put(&location, buf.clone()).await.unwrap();
-
- // Read one page out of range.
- let res = store.inner.get_range(&location, 48..54).await;
- assert!(res.is_err());
-
- // Read multiple pages out of range.
- let res = store.inner.get_range(&location, 24..54).await;
- assert!(res.is_err());
+ #[test]
+ fn test_disk_cache_out_of_range() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ rt.block_on(async {
+ let page_size = 16;
+ // 51 byte
+ let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
+ let location = Path::from("out_of_range_test.sst");
+ let store = prepare_store(page_size, 32, 0, rt.clone()).await;
+ let buf = Bytes::from_static(data);
+ store.inner.put(&location, buf.clone()).await.unwrap();
+
+ // Read one page out of range.
+ let res = store.inner.get_range(&location, 48..54).await;
+ assert!(res.is_err());
+
+ // Read multiple pages out of range.
+ let res = store.inner.get_range(&location, 24..54).await;
+ assert!(res.is_err());
+ });
}
- #[tokio::test]
- async fn test_disk_cache_store_get_range() {
- let page_size = 16;
- // 51 byte
- let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
- let location = Path::from("1.sst");
- let store = prepare_store(page_size, 1024, 0).await;
-
- let mut buf = BytesMut::with_capacity(data.len() * 4);
- // extend 4 times, then location will contain 200 bytes
- for _ in 0..4 {
- buf.extend_from_slice(data);
- }
- store.inner.put(&location, buf.freeze()).await.unwrap();
-
- let testcases = vec![
- (0..6, "a b c "),
- (0..16, "a b c d e f g h "),
- // len of aligned ranges will be 2
- (0..17, "a b c d e f g h i"),
- (16..17, "i"),
- // len of aligned ranges will be 6
- (16..100, "i j k l m n o p q r s t u v w x y za b c d e f g h i j
k l m n o p q r s t u v w x y"),
- ];
-
- for (input, expected) in testcases {
- assert_eq!(
- store.inner.get_range(&location, input).await.unwrap(),
- Bytes::copy_from_slice(expected.as_bytes())
- );
- }
-
- // remove cached values, then get again
- {
- for range in [0..16, 16..32, 32..48, 48..64, 64..80, 80..96,
96..112] {
- let data_cache = store
- .inner
- .cache
- .meta_cache
- .lock(&DiskCacheStore::page_cache_name(&location,
&range).as_str());
- assert!(data_cache
- .contains(DiskCacheStore::page_cache_name(&location,
&range).as_str()));
- assert!(test_file_exists(&store.cache_dir, &location, &range));
+ #[test]
+ fn test_disk_cache_store_get_range() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ rt.block_on(async {
+ let page_size = 16;
+ // 51 byte
+ let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
+ let location = Path::from("1.sst");
+ let store = prepare_store(page_size, 1024, 0, rt.clone()).await;
+
+ let mut buf = BytesMut::with_capacity(data.len() * 4);
+ // extend 4 times, then location will contain 200 bytes
+ for _ in 0..4 {
+ buf.extend_from_slice(data);
+ }
+ store.inner.put(&location, buf.freeze()).await.unwrap();
+
+ let testcases = vec![
+ (0..6, "a b c "),
+ (0..16, "a b c d e f g h "),
+ // len of aligned ranges will be 2
+ (0..17, "a b c d e f g h i"),
+ (16..17, "i"),
+ // len of aligned ranges will be 6
+ (16..100, "i j k l m n o p q r s t u v w x y za b c d e f g h
i j k l m n o p q r s t u v w x y"),
+ ];
+
+ for (input, expected) in testcases {
+ assert_eq!(
+ store.inner.get_range(&location, input).await.unwrap(),
+ Bytes::copy_from_slice(expected.as_bytes())
+ );
}
- for range in [16..32, 48..64, 80..96] {
- let mut data_cache = store
- .inner
- .cache
- .meta_cache
- .lock(&DiskCacheStore::page_cache_name(&location,
&range).as_str());
- assert!(data_cache
- .pop(&DiskCacheStore::page_cache_name(&location, &range))
- .is_some());
+ // remove cached values, then get again
+ {
+ for range in [0..16, 16..32, 32..48, 48..64, 64..80, 80..96,
96..112] {
+ let data_cache = store
+ .inner
+ .cache
+ .meta_cache
+ .lock(&DiskCacheStore::page_cache_name(&location,
&range).as_str());
+ assert!(data_cache
+
.contains(DiskCacheStore::page_cache_name(&location, &range).as_str()));
+ assert!(test_file_exists(&store.cache_dir, &location,
&range));
+ }
+
+ for range in [16..32, 48..64, 80..96] {
+ let mut data_cache = store
+ .inner
+ .cache
+ .meta_cache
+ .lock(&DiskCacheStore::page_cache_name(&location,
&range).as_str());
+ assert!(data_cache
+ .pop(&DiskCacheStore::page_cache_name(&location,
&range))
+ .is_some());
+ }
}
- }
- assert_eq!(
- store.inner.get_range(&location, 16..100).await.unwrap(),
- Bytes::copy_from_slice(
- b"i j k l m n o p q r s t u v w x y za b c d e f g h i j k l m
n o p q r s t u v w x y"
- )
- );
+ assert_eq!(
+ store.inner.get_range(&location, 16..100).await.unwrap(),
+ Bytes::copy_from_slice(
+ b"i j k l m n o p q r s t u v w x y za b c d e f g h i j k
l m n o p q r s t u v w x y"
+ )
+ );
+
+ });
}
- #[tokio::test]
- async fn test_disk_cache_multi_thread_fetch_same_block() {
- let page_size = 16;
- // 51 byte
- let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
- let location = Path::from("1.sst");
- let store = Arc::new(prepare_store(page_size, 32, 0).await);
-
- let mut buf = BytesMut::with_capacity(data.len() * 4);
- // extend 4 times, then location will contain 200 bytes
- for _ in 0..4 {
- buf.extend_from_slice(data);
- }
- store.inner.put(&location, buf.freeze()).await.unwrap();
-
- let testcases = vec![
- (0..6, "a b c "),
- (0..16, "a b c d e f g h "),
- (0..17, "a b c d e f g h i"),
- (16..17, "i"),
- (16..100, "i j k l m n o p q r s t u v w x y za b c d e f g h i j
k l m n o p q r s t u v w x y"),
- ];
- let testcases = testcases
- .iter()
- .cycle()
- .take(testcases.len() * 100)
- .cloned()
- .collect::<Vec<_>>();
-
- let mut tasks = Vec::with_capacity(testcases.len());
- for (input, _) in &testcases {
- let store = store.clone();
- let location = location.clone();
- let input = input.clone();
-
- tasks.push(tokio::spawn(async move {
- store.inner.get_range(&location, input).await.unwrap()
- }));
- }
+ #[test]
+ fn test_disk_cache_multi_thread_fetch_same_block() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ rt.block_on(async {
+ let page_size = 16;
+ // 51 byte
+ let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
+ let location = Path::from("1.sst");
+ let store = Arc::new(prepare_store(page_size, 32,
0,rt.clone()).await);
+
+ let mut buf = BytesMut::with_capacity(data.len() * 4);
+ // extend 4 times, then location will contain 200 bytes
+ for _ in 0..4 {
+ buf.extend_from_slice(data);
+ }
+ store.inner.put(&location, buf.freeze()).await.unwrap();
+
+ let testcases = vec![
+ (0..6, "a b c "),
+ (0..16, "a b c d e f g h "),
+ (0..17, "a b c d e f g h i"),
+ (16..17, "i"),
+ (16..100, "i j k l m n o p q r s t u v w x y za b c d e f g h
i j k l m n o p q r s t u v w x y"),
+ ];
+ let testcases = testcases
+ .iter()
+ .cycle()
+ .take(testcases.len() * 100)
+ .cloned()
+ .collect::<Vec<_>>();
+
+ let mut tasks = Vec::with_capacity(testcases.len());
+ for (input, _) in &testcases {
+ let store = store.clone();
+ let location = location.clone();
+ let input = input.clone();
+
+ tasks.push(tokio::spawn(async move {
+ store.inner.get_range(&location, input).await.unwrap()
+ }));
+ }
- let actual = futures::future::join_all(tasks).await;
- for (actual, (_, expected)) in
actual.into_iter().zip(testcases.into_iter()) {
- assert_eq!(actual.unwrap(), Bytes::from(expected))
- }
+ let actual = futures::future::join_all(tasks).await;
+ for (actual, (_, expected)) in
actual.into_iter().zip(testcases.into_iter()) {
+ assert_eq!(actual.unwrap(), Bytes::from(expected))
+ }
+ });
}
- #[tokio::test]
- async fn test_disk_cache_remove_cache_file() {
- let page_size = 16;
- // 51 byte
- let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
- let location = Path::from("remove_cache_file.sst");
- let store = prepare_store(page_size, 32, 0).await;
- let mut buf = BytesMut::with_capacity(data.len() * 4);
- // extend 4 times, then location will contain 200 bytes, but cache cap
is 32
- for _ in 0..4 {
- buf.extend_from_slice(data);
- }
- store.inner.put(&location, buf.freeze()).await.unwrap();
-
- let _ = store.inner.get_range(&location, 0..16).await.unwrap();
- let _ = store.inner.get_range(&location, 16..32).await.unwrap();
- // cache is full now
- assert!(test_file_exists(&store.cache_dir, &location, &(0..16)));
- assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
-
- // insert new cache, evict oldest entry
- let _ = store.inner.get_range(&location, 32..48).await.unwrap();
- assert!(!test_file_exists(&store.cache_dir, &location, &(0..16)));
- assert!(test_file_exists(&store.cache_dir, &location, &(32..48)));
-
- // insert new cache, evict oldest entry
- let _ = store.inner.get_range(&location, 48..64).await.unwrap();
- assert!(!test_file_exists(&store.cache_dir, &location, &(16..32)));
- assert!(test_file_exists(&store.cache_dir, &location, &(48..64)));
+ #[test]
+ fn test_disk_cache_remove_cache_file() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ rt.block_on(async {
+ let page_size = 16;
+ // 51 byte
+ let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
+ let location = Path::from("remove_cache_file.sst");
+ let store = prepare_store(page_size, 32, 0, rt.clone()).await;
+ let mut buf = BytesMut::with_capacity(data.len() * 4);
+ // extend 4 times, then location will contain 200 bytes, but cache
cap is 32
+ for _ in 0..4 {
+ buf.extend_from_slice(data);
+ }
+ store.inner.put(&location, buf.freeze()).await.unwrap();
+
+ let _ = store.inner.get_range(&location, 0..16).await.unwrap();
+ let _ = store.inner.get_range(&location, 16..32).await.unwrap();
+ // cache is full now
+ assert!(test_file_exists(&store.cache_dir, &location, &(0..16)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
+
+ // insert new cache, evict oldest entry
+ let _ = store.inner.get_range(&location, 32..48).await.unwrap();
+ assert!(!test_file_exists(&store.cache_dir, &location, &(0..16)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(32..48)));
+
+ // insert new cache, evict oldest entry
+ let _ = store.inner.get_range(&location, 48..64).await.unwrap();
+ assert!(!test_file_exists(&store.cache_dir, &location, &(16..32)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(48..64)));
+ });
}
- #[tokio::test]
- async fn test_disk_cache_remove_cache_file_two_partition() {
- let page_size = 16;
- // 51 byte
- let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
- let location = Path::from("remove_cache_file_two_partition.sst");
- // partition_cap: 64 / 16 / 2 = 2
- let store = prepare_store(page_size, 64, 1).await;
- let mut buf = BytesMut::with_capacity(data.len() * 8);
- // extend 8 times
- for _ in 0..8 {
- buf.extend_from_slice(data);
- }
- store.inner.put(&location, buf.freeze()).await.unwrap();
- // use seahash
- // 0..16: partition 1
- // 16..32 partition 1
- // 32..48 partition 0
- // 48..64 partition 1
- // 64..80 partition 1
- // 80..96 partition 0
- // 96..112 partition 0
- // 112..128 partition 0
- // 128..144 partition 0
- let _ = store.inner.get_range(&location, 0..16).await.unwrap();
- let _ = store.inner.get_range(&location, 16..32).await.unwrap();
- // partition 1 cache is full now
- assert!(test_file_exists(&store.cache_dir, &location, &(0..16)));
- assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
-
- let _ = store.inner.get_range(&location, 32..48).await.unwrap();
- let _ = store.inner.get_range(&location, 80..96).await.unwrap();
- // partition 0 cache is full now
-
- assert!(test_file_exists(&store.cache_dir, &location, &(32..48)));
- assert!(test_file_exists(&store.cache_dir, &location, &(80..96)));
-
- // insert new entry into partition 0, evict partition 0's oldest entry
- let _ = store.inner.get_range(&location, 96..112).await.unwrap();
- assert!(!test_file_exists(&store.cache_dir, &location, &(32..48)));
- assert!(test_file_exists(&store.cache_dir, &location, &(80..96)));
-
- assert!(test_file_exists(&store.cache_dir, &location, &(0..16)));
- assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
-
- // insert new entry into partition 0, evict partition 0's oldest entry
- let _ = store.inner.get_range(&location, 128..144).await.unwrap();
- assert!(!test_file_exists(&store.cache_dir, &location, &(80..96)));
- assert!(test_file_exists(&store.cache_dir, &location, &(96..112)));
- assert!(test_file_exists(&store.cache_dir, &location, &(128..144)));
-
- assert!(test_file_exists(&store.cache_dir, &location, &(0..16)));
- assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
-
- // insert new entry into partition 1, evict partition 1's oldest entry
- let _ = store.inner.get_range(&location, 64..80).await.unwrap();
- assert!(!test_file_exists(&store.cache_dir, &location, &(0..16)));
- assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
- assert!(test_file_exists(&store.cache_dir, &location, &(64..80)));
-
- assert!(test_file_exists(&store.cache_dir, &location, &(96..112)));
- assert!(test_file_exists(&store.cache_dir, &location, &(128..144)));
+ #[test]
+ fn test_disk_cache_remove_cache_file_two_partition() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ rt.block_on(async {
+ let page_size = 16;
+ // 51 byte
+ let data = b"a b c d e f g h i j k l m n o p q r s t u v w x y z";
+ let location = Path::from("remove_cache_file_two_partition.sst");
+ // partition_cap: 64 / 16 / 2 = 2
+ let store = prepare_store(page_size, 64, 1, rt.clone()).await;
+ let mut buf = BytesMut::with_capacity(data.len() * 8);
+ // extend 8 times
+ for _ in 0..8 {
+ buf.extend_from_slice(data);
+ }
+ store.inner.put(&location, buf.freeze()).await.unwrap();
+ // use seahash
+ // 0..16: partition 1
+ // 16..32 partition 1
+ // 32..48 partition 0
+ // 48..64 partition 1
+ // 64..80 partition 1
+ // 80..96 partition 0
+ // 96..112 partition 0
+ // 112..128 partition 0
+ // 128..144 partition 0
+ let _ = store.inner.get_range(&location, 0..16).await.unwrap();
+ let _ = store.inner.get_range(&location, 16..32).await.unwrap();
+ // partition 1 cache is full now
+ assert!(test_file_exists(&store.cache_dir, &location, &(0..16)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
+
+ let _ = store.inner.get_range(&location, 32..48).await.unwrap();
+ let _ = store.inner.get_range(&location, 80..96).await.unwrap();
+ // partition 0 cache is full now
+
+ assert!(test_file_exists(&store.cache_dir, &location, &(32..48)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(80..96)));
+
+ // insert new entry into partition 0, evict partition 0's oldest
entry
+ let _ = store.inner.get_range(&location, 96..112).await.unwrap();
+ assert!(!test_file_exists(&store.cache_dir, &location, &(32..48)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(80..96)));
+
+ assert!(test_file_exists(&store.cache_dir, &location, &(0..16)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
+
+ // insert new entry into partition 0, evict partition 0's oldest
entry
+ let _ = store.inner.get_range(&location, 128..144).await.unwrap();
+ assert!(!test_file_exists(&store.cache_dir, &location, &(80..96)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(96..112)));
+ assert!(test_file_exists(&store.cache_dir, &location,
&(128..144)));
+
+ assert!(test_file_exists(&store.cache_dir, &location, &(0..16)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
+
+ // insert new entry into partition 1, evict partition 1's oldest
entry
+ let _ = store.inner.get_range(&location, 64..80).await.unwrap();
+ assert!(!test_file_exists(&store.cache_dir, &location, &(0..16)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(16..32)));
+ assert!(test_file_exists(&store.cache_dir, &location, &(64..80)));
+
+ assert!(test_file_exists(&store.cache_dir, &location, &(96..112)));
+ assert!(test_file_exists(&store.cache_dir, &location,
&(128..144)));
+ });
}
- #[tokio::test]
- async fn test_disk_cache_manifest() {
- let cache_dir = tempdir().unwrap();
- let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string();
- let page_size = 8;
- let first_create_time = {
- let _store = {
- let local_path = tempdir().unwrap();
- let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
- DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8,
local_store, 0)
+ #[test]
+ fn test_disk_cache_manifest() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ rt.block_on(async {
+ let cache_dir = tempdir().unwrap();
+ let cache_root_dir =
cache_dir.as_ref().to_string_lossy().to_string();
+ let page_size = 8;
+ let first_create_time = {
+ let _store = {
+ let local_path = tempdir().unwrap();
+ let local_store =
+
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+ DiskCacheStore::try_new(
+ cache_root_dir.clone(),
+ 160,
+ 8,
+ local_store,
+ 0,
+ rt.clone(),
+ )
.await
.unwrap()
+ };
+ let manifest =
+
DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir, page_size)
+ .await
+ .unwrap();
+
+ assert_eq!(manifest.page_size, 8);
+ assert_eq!(manifest.version, Manifest::CURRENT_VERSION);
+ manifest.create_at
};
- let manifest =
- DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir,
page_size)
+
+ // open again
+ {
+ let _store = {
+ let local_path = tempdir().unwrap();
+ let local_store =
+
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+ DiskCacheStore::try_new(
+ cache_root_dir.clone(),
+ 160,
+ 8,
+ local_store,
+ 0,
+ rt.clone(),
+ )
.await
- .unwrap();
+ .unwrap()
+ };
- assert_eq!(manifest.page_size, 8);
- assert_eq!(manifest.version, Manifest::CURRENT_VERSION);
- manifest.create_at
- };
+ let manifest =
+
DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir, page_size)
+ .await
+ .unwrap();
+ assert_eq!(manifest.create_at, first_create_time);
+ assert_eq!(manifest.page_size, 8);
+ assert_eq!(manifest.version, Manifest::CURRENT_VERSION);
+ }
- // open again
- {
- let _store = {
+ // open again, but with different page_size
+ {
let local_path = tempdir().unwrap();
let local_store =
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
- DiskCacheStore::try_new(cache_root_dir.clone(), 160, 8,
local_store, 0)
- .await
- .unwrap()
- };
-
- let manifest =
- DiskCacheStore::create_manifest_if_not_exists(&cache_root_dir,
page_size)
- .await
- .unwrap();
- assert_eq!(manifest.create_at, first_create_time);
- assert_eq!(manifest.page_size, 8);
- assert_eq!(manifest.version, Manifest::CURRENT_VERSION);
- }
+ let store = DiskCacheStore::try_new(
+ cache_dir.as_ref().to_string_lossy().to_string(),
+ 160,
+ page_size * 2,
+ local_store,
+ 0,
+ rt.clone(),
+ )
+ .await;
- // open again, but with different page_size
- {
- let local_path = tempdir().unwrap();
- let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
- let store = DiskCacheStore::try_new(
- cache_dir.as_ref().to_string_lossy().to_string(),
- 160,
- page_size * 2,
- local_store,
- 0,
- )
- .await;
-
- assert!(store.is_err())
- }
+ assert!(store.is_err())
+ }
+ });
}
- #[tokio::test]
- async fn test_disk_cache_recovery() {
- let cache_dir = tempdir().unwrap();
- let cache_root_dir = cache_dir.as_ref().to_string_lossy().to_string();
- let page_size = 16;
- let location = Path::from("recovery.sst");
- {
- let store = {
- let local_path = tempdir().unwrap();
- let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
- DiskCacheStore::try_new(cache_root_dir.clone(), 10240,
page_size, local_store, 0)
+ #[test]
+ fn test_disk_cache_recovery() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ rt.block_on(async {
+ let cache_dir = tempdir().unwrap();
+ let cache_root_dir =
cache_dir.as_ref().to_string_lossy().to_string();
+ let page_size = 16;
+ let location = Path::from("recovery.sst");
+ {
+ let store = {
+ let local_path = tempdir().unwrap();
+ let local_store =
+
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+ DiskCacheStore::try_new(
+ cache_root_dir.clone(),
+ 10240,
+ page_size,
+ local_store,
+ 0,
+ rt.clone(),
+ )
.await
.unwrap()
+ };
+ let data = b"abcd";
+ let mut buf = BytesMut::with_capacity(data.len() * 1024);
+ for _ in 0..1024 {
+ buf.extend_from_slice(data);
+ }
+ let buf = buf.freeze();
+ store.put(&location, buf.clone()).await.unwrap();
+ let read_range = 16..100;
+ let bytes = store
+ .get_range(&location, read_range.clone())
+ .await
+ .unwrap();
+ assert_eq!(bytes.len(), read_range.len());
+ assert_eq!(bytes[..], buf[read_range])
};
- let data = b"abcd";
- let mut buf = BytesMut::with_capacity(data.len() * 1024);
- for _ in 0..1024 {
- buf.extend_from_slice(data);
- }
- let buf = buf.freeze();
- store.put(&location, buf.clone()).await.unwrap();
- let read_range = 16..100;
- let bytes = store
- .get_range(&location, read_range.clone())
- .await
- .unwrap();
- assert_eq!(bytes.len(), read_range.len());
- assert_eq!(bytes[..], buf[read_range])
- };
- // recover
- {
- let store = {
- let local_path = tempdir().unwrap();
- let local_store =
-
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
- DiskCacheStore::try_new(cache_root_dir.clone(), 160,
page_size, local_store, 0)
+ // recover
+ {
+ let store = {
+ let local_path = tempdir().unwrap();
+ let local_store =
+
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+ DiskCacheStore::try_new(
+ cache_root_dir.clone(),
+ 160,
+ page_size,
+ local_store,
+ 0,
+ rt.clone(),
+ )
.await
.unwrap()
+ };
+ for range in [16..32, 32..48, 48..64, 64..80, 80..96, 96..112]
{
+ let filename = DiskCacheStore::page_cache_name(&location,
&range);
+ let cache = store.cache.meta_cache.lock(&filename);
+ assert!(cache.contains(&filename));
+ assert!(test_file_exists(&cache_dir, &location, &range));
+ }
};
- for range in [16..32, 32..48, 48..64, 64..80, 80..96, 96..112] {
- let filename = DiskCacheStore::page_cache_name(&location,
&range);
- let cache = store.cache.meta_cache.lock(&filename);
- assert!(cache.contains(&filename));
- assert!(test_file_exists(&cache_dir, &location, &range));
- }
- };
+ });
}
#[test]
@@ -1381,18 +1453,21 @@ mod test {
}
}
- #[tokio::test]
- async fn test_corrupt_disk_cache() {
- for page_size in [1, 2, 4, 8, 16, 32, 64, 128] {
- corrupt_disk_cache(page_size).await;
- }
+ #[test]
+ fn test_corrupt_disk_cache() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ rt.block_on(async {
+ for page_size in [1, 2, 4, 8, 16, 32, 64, 128] {
+ corrupt_disk_cache(page_size, rt.clone()).await;
+ }
+ });
}
- async fn corrupt_disk_cache(page_size: usize) {
+ async fn corrupt_disk_cache(page_size: usize, rt: RuntimeRef) {
let StoreWithCacheDir {
inner: store,
cache_dir,
- } = prepare_store(page_size, 1024, 0).await;
+ } = prepare_store(page_size, 1024, 0, rt).await;
let test_file_name = "corrupted_disk_cache_file";
let test_file_path = Path::from(test_file_name);
let test_file_bytes = Bytes::from("corrupted_disk_cache_file_data");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]