This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new dc10253f feat: add table status check (#1418)
dc10253f is described below
commit dc10253f31d7bac3ab46c44424584ebd2fb99b8f
Author: CooooolFrog <[email protected]>
AuthorDate: Mon Jan 8 09:57:15 2024 +0800
feat: add table status check (#1418)
## Rationale
Refer to this issue
https://github.com/apache/incubator-horaedb/issues/1386, currently, if
the status of the shard is abnormal, we cannot get any valid exception
information from the error message `table not found`.
## Detailed Changes
* Add `TableStatus` in `cluster`, you can use it to get the status of
the table in the current cluster..
* Add `SchemaWithCluster`, It wraps the schema inside the cluster,
through which the state of the cluster and schema can be combined.
## Test Plan
Pass CI.
---
catalog/src/schema.rs | 3 +
catalog_impls/src/cluster_based.rs | 115 +++++++++++++++++++++++++++++++++++++
catalog_impls/src/lib.rs | 1 +
catalog_impls/src/volatile.rs | 20 +++++--
cluster/src/cluster_impl.rs | 21 ++++++-
cluster/src/lib.rs | 24 +++++++-
cluster/src/shard_set.rs | 10 ++++
meta_client/src/types.rs | 5 ++
router/src/cluster_based.rs | 5 ++
src/ceresdb/src/setup.rs | 7 ++-
10 files changed, 202 insertions(+), 9 deletions(-)
diff --git a/catalog/src/schema.rs b/catalog/src/schema.rs
index 51fb7f82..c3997ca4 100644
--- a/catalog/src/schema.rs
+++ b/catalog/src/schema.rs
@@ -181,6 +181,9 @@ pub enum Error {
table: String,
backtrace: Backtrace,
},
+
+ #[snafu(display("Table is not ready, err:{}", source))]
+ TableNotReady { source: GenericError },
}
define_result!(Error);
diff --git a/catalog_impls/src/cluster_based.rs
b/catalog_impls/src/cluster_based.rs
new file mode 100644
index 00000000..650d2019
--- /dev/null
+++ b/catalog_impls/src/cluster_based.rs
@@ -0,0 +1,115 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use async_trait::async_trait;
+use catalog::{
+ schema,
+ schema::{
+ CreateOptions, CreateTableRequest, DropOptions, DropTableRequest,
NameRef, Schema,
+ SchemaRef, TableNotReady,
+ },
+};
+use cluster::{ClusterRef, TableStatus};
+use generic_error::BoxError;
+use snafu::{ResultExt, Snafu};
+use table_engine::table::{SchemaId, TableRef};
+
+#[derive(Debug, Snafu)]
+#[snafu(visibility(pub))]
+pub enum Error {
+ #[snafu(display("Invalid table status, status:{:?}", status))]
+ InvalidTableStatus { status: TableStatus },
+}
+
+/// A cluster-based implementation for [`schema`].
+
+/// Schema with cluster.
+/// It binds cluster and schema and will detect the health status of the
cluster
+/// when calling the schema interface.
+pub(crate) struct SchemaWithCluster {
+ internal: SchemaRef,
+
+ cluster: ClusterRef,
+}
+
+impl SchemaWithCluster {
+ pub(crate) fn new(internal: SchemaRef, cluster: ClusterRef) ->
SchemaWithCluster {
+ SchemaWithCluster { internal, cluster }
+ }
+
+ // Get table status, return None when table not found in shard.
+ fn table_status(&self, table_name: NameRef) -> Option<TableStatus> {
+ self.cluster.get_table_status(self.name(), table_name)
+ }
+}
+
+#[async_trait]
+impl Schema for SchemaWithCluster {
+ fn name(&self) -> NameRef {
+ self.internal.name()
+ }
+
+ fn id(&self) -> SchemaId {
+ self.internal.id()
+ }
+
+ fn table_by_name(&self, name: NameRef) -> schema::Result<Option<TableRef>>
{
+ let find_table_result = self.internal.table_by_name(name)?;
+
+ if find_table_result.is_none() {
+ return match self.table_status(name) {
+ // Table not found in schema and shard not contains this table.
+ None => Ok(None),
+ // Table not found in schema but shard contains this table.
+ // Check the status of the shard.
+ Some(table_status) => InvalidTableStatus {
+ status: table_status,
+ }
+ .fail()
+ .box_err()
+ .with_context(|| TableNotReady {})?,
+ };
+ }
+
+ Ok(find_table_result)
+ }
+
+ async fn create_table(
+ &self,
+ request: CreateTableRequest,
+ opts: CreateOptions,
+ ) -> schema::Result<TableRef> {
+ self.internal.create_table(request, opts).await
+ }
+
+ async fn drop_table(
+ &self,
+ request: DropTableRequest,
+ opts: DropOptions,
+ ) -> schema::Result<bool> {
+ self.internal.drop_table(request, opts).await
+ }
+
+ fn all_tables(&self) -> schema::Result<Vec<TableRef>> {
+ self.internal.all_tables()
+ }
+
+ fn register_table(&self, table: TableRef) {
+ self.internal.register_table(table)
+ }
+
+ fn unregister_table(&self, table_name: &str) {
+ self.internal.unregister_table(table_name)
+ }
+}
diff --git a/catalog_impls/src/lib.rs b/catalog_impls/src/lib.rs
index 2abbda95..90edc1b1 100644
--- a/catalog_impls/src/lib.rs
+++ b/catalog_impls/src/lib.rs
@@ -24,6 +24,7 @@ use system_catalog::{tables::Tables, SystemTableAdapter};
use crate::system_tables::{SystemTables, SystemTablesBuilder};
+mod cluster_based;
mod system_tables;
pub mod table_based;
pub mod volatile;
diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs
index e217d157..a2aa7b99 100644
--- a/catalog_impls/src/volatile.rs
+++ b/catalog_impls/src/volatile.rs
@@ -32,7 +32,7 @@ use catalog::{
},
Catalog, CatalogRef, CreateSchemaWithCause,
};
-use cluster::shard_set::ShardSet;
+use cluster::{shard_set::ShardSet, ClusterRef};
use common_types::schema::SchemaName;
use generic_error::BoxError;
use logger::{debug, info};
@@ -41,19 +41,23 @@ use snafu::{ensure, OptionExt, ResultExt};
use table_engine::table::{SchemaId, TableRef};
use tokio::sync::Mutex;
+use crate::cluster_based::SchemaWithCluster;
+
/// ManagerImpl manages multiple volatile catalogs.
pub struct ManagerImpl {
catalogs: HashMap<String, Arc<CatalogImpl>>,
shard_set: ShardSet,
meta_client: MetaClientRef,
+ cluster: ClusterRef,
}
impl ManagerImpl {
- pub fn new(shard_set: ShardSet, meta_client: MetaClientRef) -> Self {
+ pub fn new(shard_set: ShardSet, meta_client: MetaClientRef, cluster:
ClusterRef) -> Self {
let mut manager = ManagerImpl {
catalogs: HashMap::new(),
shard_set,
meta_client,
+ cluster,
};
manager.maybe_create_default_catalog();
@@ -101,6 +105,7 @@ impl ManagerImpl {
schemas: RwLock::new(HashMap::new()),
shard_set: self.shard_set.clone(),
meta_client: self.meta_client.clone(),
+ cluster: self.cluster.clone(),
});
self.catalogs.insert(catalog_name, catalog.clone());
@@ -121,6 +126,7 @@ struct CatalogImpl {
schemas: RwLock<HashMap<SchemaName, SchemaRef>>,
shard_set: ShardSet,
meta_client: MetaClientRef,
+ cluster: ClusterRef,
}
#[async_trait]
@@ -171,7 +177,10 @@ impl Catalog for CatalogImpl {
self.shard_set.clone(),
));
- schemas.insert(name.to_string(), schema);
+ let cluster_based: SchemaRef =
+ Arc::new(SchemaWithCluster::new(schema, self.cluster.clone()));
+
+ schemas.insert(name.to_string(), cluster_based);
info!(
"create schema success, catalog:{}, schema:{}",
@@ -282,7 +291,10 @@ impl Schema for SchemaImpl {
}
fn table_by_name(&self, name: NameRef) -> schema::Result<Option<TableRef>>
{
- let table = self.tables.read().unwrap().get(name).cloned();
+ let table = self
+ .get_table(self.catalog_name.as_str(), self.schema_name.as_str(),
name)
+ .unwrap()
+ .clone();
Ok(table)
}
diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs
index 6804081a..3ec73a00 100644
--- a/cluster/src/cluster_impl.rs
+++ b/cluster/src/cluster_impl.rs
@@ -44,7 +44,7 @@ use crate::{
topology::ClusterTopology,
Cluster, ClusterNodesNotFound, ClusterNodesResp,
EtcdClientFailureWithCause,
InitEtcdClientConfig, InvalidArguments, MetaClientFailure, OpenShard,
OpenShardWithCause,
- Result, ShardNotFound,
+ Result, ShardNotFound, TableStatus,
};
/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
@@ -311,6 +311,19 @@ impl Inner {
self.shard_set.get(shard_id)
}
+ /// Get shard by table name.
+ ///
+ /// This method is similar to `route_tables`, but it will not send request
+ /// to meta server, it only load data from local cache.
+ /// If target table is not found in any shards in this cluster, return
None.
+ /// Otherwise, return the shard where this table is exists.
+ fn get_shard_by_table_name(&self, schema_name: &str, table_name: &str) ->
Option<ShardRef> {
+ let shards = self.shard_set.all_shards();
+ shards
+ .into_iter()
+ .find(|shard| shard.find_table(schema_name, table_name).is_some())
+ }
+
fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef> {
info!("Remove shard from shard_set, id:{shard_id}");
self.shard_set
@@ -368,6 +381,12 @@ impl Cluster for ClusterImpl {
self.inner.shard(shard_id)
}
+ fn get_table_status(&self, schema_name: &str, table_name: &str) ->
Option<TableStatus> {
+ self.inner
+ .get_shard_by_table_name(schema_name, table_name)
+ .map(|shard| TableStatus::from(shard.get_status()))
+ }
+
async fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef> {
self.inner.close_shard(shard_id)
}
diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs
index be7374d7..f1326529 100644
--- a/cluster/src/lib.rs
+++ b/cluster/src/lib.rs
@@ -29,7 +29,8 @@ use common_types::schema::SchemaName;
use generic_error::GenericError;
use macros::define_result;
use meta_client::types::{
- ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId,
ShardInfo, ShardVersion,
+ ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId,
ShardInfo, ShardStatus,
+ ShardVersion,
};
use shard_lock_manager::ShardLockManagerRef;
use snafu::{Backtrace, Snafu};
@@ -161,6 +162,23 @@ pub enum Error {
define_result!(Error);
+#[derive(Debug)]
+pub enum TableStatus {
+ Ready,
+ Recovering,
+ Frozen,
+}
+
+impl From<ShardStatus> for TableStatus {
+ fn from(value: ShardStatus) -> Self {
+ match value {
+ ShardStatus::Init | ShardStatus::Opening =>
TableStatus::Recovering,
+ ShardStatus::Ready => TableStatus::Ready,
+ ShardStatus::Frozen => TableStatus::Frozen,
+ }
+ }
+}
+
pub type ClusterRef = Arc<dyn Cluster + Send + Sync>;
#[derive(Clone, Debug)]
@@ -184,12 +202,14 @@ pub trait Cluster {
/// None.
fn shard(&self, shard_id: ShardId) -> Option<ShardRef>;
+ fn get_table_status(&self, schema_name: &str, table_name: &str) ->
Option<TableStatus>;
+
/// Close shard.
///
/// Return error if the shard is not found.
async fn close_shard(&self, shard_id: ShardId) -> Result<ShardRef>;
- /// list shards
+ /// list loaded shards in current node.
fn list_shards(&self) -> Vec<ShardInfo>;
async fn route_tables(&self, req: &RouteTablesRequest) ->
Result<RouteTablesResponse>;
diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs
index b815c604..00bbb9ef 100644
--- a/cluster/src/shard_set.rs
+++ b/cluster/src/shard_set.rs
@@ -132,11 +132,21 @@ impl Shard {
ret
}
+ pub fn get_status(&self) -> ShardStatus {
+ let data = self.data.read().unwrap();
+ data.shard_info.status.clone()
+ }
+
pub fn is_opened(&self) -> bool {
let data = self.data.read().unwrap();
data.is_opened()
}
+ pub fn is_frozen(&self) -> bool {
+ let data = self.data.read().unwrap();
+ data.is_frozen()
+ }
+
pub async fn close(&self, ctx: CloseContext) -> Result<()> {
let operator = self.operator.lock().await;
operator.close(ctx).await
diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs
index f822428e..f23e8c54 100644
--- a/meta_client/src/types.rs
+++ b/meta_client/src/types.rs
@@ -226,6 +226,11 @@ impl ShardInfo {
pub fn is_opened(&self) -> bool {
matches!(self.status, ShardStatus::Ready | ShardStatus::Frozen)
}
+
+ #[inline]
+ pub fn is_ready(&self) -> bool {
+ matches!(self.status, ShardStatus::Ready)
+ }
}
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize)]
diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs
index 83d2b266..04f30c84 100644
--- a/router/src/cluster_based.rs
+++ b/router/src/cluster_based.rs
@@ -201,6 +201,7 @@ mod tests {
use ceresdbproto::storage::{RequestContext, RouteRequest as
RouteRequestPb};
use cluster::{
shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster,
ClusterNodesResp,
+ TableStatus,
};
use common_types::table::ShardId;
use meta_client::types::{
@@ -230,6 +231,10 @@ mod tests {
unimplemented!();
}
+ fn get_table_status(&self, _: &str, _: &str) -> Option<TableStatus> {
+ unimplemented!()
+ }
+
async fn close_shard(&self, _: ShardId) -> cluster::Result<ShardRef> {
unimplemented!();
}
diff --git a/src/ceresdb/src/setup.rs b/src/ceresdb/src/setup.rs
index a7ae6a31..d6d4e770 100644
--- a/src/ceresdb/src/setup.rs
+++ b/src/ceresdb/src/setup.rs
@@ -334,8 +334,11 @@ async fn build_with_meta<T: WalsOpener>(
};
let engine_proxy = build_table_engine_proxy(engine_builder).await;
- let meta_based_manager_ref =
- Arc::new(volatile::ManagerImpl::new(shard_set, meta_client.clone()));
+ let meta_based_manager_ref = Arc::new(volatile::ManagerImpl::new(
+ shard_set,
+ meta_client.clone(),
+ cluster.clone(),
+ ));
// Build catalog manager.
let catalog_manager =
Arc::new(CatalogManagerImpl::new(meta_based_manager_ref));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]