This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch memtable-poc
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/memtable-poc by this push:
new a8036e94 chore: attach endpoint to remote error
a8036e94 is described below
commit a8036e94b2c1ab0b63e0833174e8c6d4cb42ac67
Author: jiacai2050 <[email protected]>
AuthorDate: Thu Jan 4 14:20:33 2024 +0800
chore: attach endpoint to remote error
---
.github/workflows/ci.yml | 1 +
.../cases/env/cluster/ddl/partition_table.result | 3 +-
.../cases/env/cluster/ddl/partition_table.sql | 1 +
interpreters/src/table_manipulator/meta_based.rs | 2 +-
remote_engine_client/src/client.rs | 54 ++++++++++++++++++----
remote_engine_client/src/lib.rs | 5 +-
6 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 13b4773a..25ea726d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -20,6 +20,7 @@ on:
push:
branches:
- main
+ - dev
paths-ignore:
- 'docs/**'
- 'etc/**'
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 623be786..3520852a 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -82,9 +82,10 @@ ALTER TABLE partition_table_t ADD COLUMN (b string);
affected_rows: 0
+-- SQLNESS REPLACE endpoint:(.*?), endpoint:xx,
INSERT INTO partition_table_t (t, id, name, value) VALUES (1651737067000, 10,
"ceresdb0", 100);
-Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to
execute plan. Caused by: Internal error, msg:Failed to execute interpreter,
err:Failed to execute insert, err:Failed to write table, err:Failed to write
tables in batch, tables:[\"__partition_table_t_1\"], err:Failed to query from
table in server, table_idents:[TableIdentifier { catalog: \"ceresdb\", schema:
\"public\", table: \"__partition_table_t_1\" }], code:401, msg:failed to decode
row group payload. Cause [...]
+Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to
execute plan. Caused by: Internal error, msg:Failed to execute interpreter,
err:Failed to execute insert, err:Failed to write table, err:Failed to write
tables in batch, tables:[\"__partition_table_t_1\"], err:Failed to query from
table in server, table_idents:[TableIdentifier { catalog: \"ceresdb\", schema:
\"public\", table: \"__partition_table_t_1\" }], endpoint:xx, code:401,
msg:failed to decode row group p [...]
ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true';
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index 76fc3986..6b3dafb7 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -37,6 +37,7 @@ SELECT * from partition_table_t where name in ("ceresdb5",
"ceresdb6", "ceresdb7
ALTER TABLE partition_table_t ADD COLUMN (b string);
+-- SQLNESS REPLACE endpoint:(.*?), endpoint:xx,
INSERT INTO partition_table_t (t, id, name, value) VALUES (1651737067000, 10,
"ceresdb0", 100);
ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true';
diff --git a/interpreters/src/table_manipulator/meta_based.rs
b/interpreters/src/table_manipulator/meta_based.rs
index bdecfd3e..2e6f8e69 100644
--- a/interpreters/src/table_manipulator/meta_based.rs
+++ b/interpreters/src/table_manipulator/meta_based.rs
@@ -126,7 +126,7 @@ impl TableManipulator for TableManipulatorImpl {
.await
.box_err()
.context(DropWithCause {
- msg: format!("failed to create table by meta client,
req:{req:?}"),
+ msg: format!("failed to drop table by meta client,
req:{req:?}"),
})?;
info!(
diff --git a/remote_engine_client/src/client.rs
b/remote_engine_client/src/client.rs
index 80e47ad6..22dd36a8 100644
--- a/remote_engine_client/src/client.rs
+++ b/remote_engine_client/src/client.rs
@@ -33,7 +33,7 @@ use common_types::{record_batch::RecordBatch,
schema::RecordSchema};
use futures::{Stream, StreamExt};
use generic_error::BoxError;
use logger::{error, info};
-use router::RouterRef;
+use router::{endpoint::Endpoint, RouterRef};
use runtime::Runtime;
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
@@ -115,8 +115,18 @@ impl Client {
// When success to get the stream, table has been found in remote, not
need to
// evict cache entry.
let response = response.into_inner();
+<<<<<<< ours
let remote_read_record_batch_stream =
ClientReadRecordBatchStream::new(table_ident, response,
record_schema);
+=======
+ let remote_read_record_batch_stream = ClientReadRecordBatchStream::new(
+ route_context.endpoint,
+ table_ident,
+ response,
+ record_schema,
+ Default::default(),
+ );
+>>>>>>> theirs
Ok(remote_read_record_batch_stream)
}
@@ -127,6 +137,7 @@ impl Client {
// Write to remote.
let table_ident = request.table.clone();
+ let endpoint = route_context.endpoint.clone();
let request_pb = request.convert_into_pb().box_err().context(Convert {
msg: "Failed to convert WriteRequest to pb",
})?;
@@ -144,6 +155,7 @@ impl Client {
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint,
table_idents: vec![table_ident.clone()],
code: header.code,
msg: header.error.clone(),
@@ -179,9 +191,9 @@ impl Client {
}
// Merge according to endpoint.
- let mut remote_writes =
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
+ let mut write_handles =
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
let mut written_tables =
Vec::with_capacity(write_batch_contexts_by_endpoint.len());
- for (_, context) in write_batch_contexts_by_endpoint {
+ for (endpoint, context) in write_batch_contexts_by_endpoint {
// Write to remote.
let WriteBatchContext {
table_idents,
@@ -196,18 +208,18 @@ impl Client {
rpc_client
.write_batch(Request::new(batch_request_pb))
.await
+ .map(|v| (v, endpoint.clone()))
.box_err()
});
- remote_writes.push(handle);
+ write_handles.push(handle);
written_tables.push(table_idents);
}
- let mut results = Vec::with_capacity(remote_writes.len());
- for (table_idents, remote_write) in
written_tables.into_iter().zip(remote_writes) {
- let batch_result = remote_write.await;
+ let mut results = Vec::with_capacity(write_handles.len());
+ for (table_idents, handle) in
written_tables.into_iter().zip(write_handles) {
// If it's runtime error, don't evict entires from route cache.
- let batch_result = match batch_result.box_err() {
+ let batch_result = match handle.await.box_err() {
Ok(result) => result,
Err(e) => {
results.push(WriteBatchResult {
@@ -219,10 +231,12 @@ impl Client {
};
// Check remote write result then.
- let result = batch_result.and_then(|response| {
+ let result = batch_result.and_then(|result| {
+ let (response, endpoint) = result;
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint,
table_idents: table_idents.clone(),
code: header.code,
msg: header.error.clone(),
@@ -252,6 +266,7 @@ impl Client {
let route_context =
self.cached_router.route(&request.table_ident).await?;
let table_ident = request.table_ident.clone();
+ let endpoint = route_context.endpoint.clone();
let request_pb: ceresdbproto::remote_engine::AlterTableSchemaRequest =
request.into();
let mut rpc_client =
RemoteEngineServiceClient::<Channel>::new(route_context.channel);
@@ -271,6 +286,7 @@ impl Client {
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint:endpoint.clone(),
table_idents: vec![table_ident.clone()],
code: header.code,
msg: header.error.clone(),
@@ -310,6 +326,7 @@ impl Client {
let route_context =
self.cached_router.route(&request.table_ident).await?;
let table_ident = request.table_ident.clone();
+ let endpoint = route_context.endpoint.clone();
let request_pb: ceresdbproto::remote_engine::AlterTableOptionsRequest
= request.into();
let mut rpc_client =
RemoteEngineServiceClient::<Channel>::new(route_context.channel);
@@ -328,6 +345,7 @@ impl Client {
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint:endpoint.clone(),
table_idents: vec![table_ident.clone()],
code: header.code,
msg: header.error.clone(),
@@ -363,6 +381,7 @@ impl Client {
// Find the channel from router firstly.
let route_context = self.cached_router.route(&request.table).await?;
let table_ident = request.table.clone();
+ let endpoint = route_context.endpoint.clone();
let request_pb =
ceresdbproto::remote_engine::GetTableInfoRequest::try_from(request)
.box_err()
.context(Convert {
@@ -383,6 +402,7 @@ impl Client {
let response = response.into_inner();
if let Some(header) = &response.header &&
!status_code::is_ok(header.code) {
Server {
+ endpoint:endpoint.clone(),
table_idents: vec![table_ident.clone()],
code: header.code,
msg: header.error.clone(),
@@ -395,6 +415,7 @@ impl Client {
match result {
Ok(response) => {
let table_info = response.table_info.context(Server {
+ endpoint: endpoint.clone(),
table_idents: vec![table_ident.clone()],
code: status_code::StatusCode::Internal.as_u32(),
msg: "Table info is empty",
@@ -415,6 +436,7 @@ impl Client {
msg: "Failed to covert table schema",
})?
.with_context(|| Server {
+ endpoint,
table_idents: vec![table_ident],
code: status_code::StatusCode::Internal.as_u32(),
msg: "Table schema is empty",
@@ -481,8 +503,18 @@ impl Client {
// When success to get the stream, table has been found in remote, not
need to
// evict cache entry.
let response = response.into_inner();
+<<<<<<< ours
let remote_execute_plan_stream =
ClientReadRecordBatchStream::new(table_ident, response,
plan_schema);
+=======
+ let remote_execute_plan_stream = ClientReadRecordBatchStream::new(
+ route_context.endpoint,
+ table_ident,
+ response,
+ plan_schema,
+ request.remote_metrics,
+ );
+>>>>>>> theirs
Ok(remote_execute_plan_stream)
}
@@ -497,6 +529,7 @@ impl Client {
}
pub struct ClientReadRecordBatchStream {
+ endpoint: Endpoint,
pub table_ident: TableIdentifier,
pub response_stream: Streaming<remote_engine::ReadResponse>,
pub record_schema: RecordSchema,
@@ -504,11 +537,13 @@ pub struct ClientReadRecordBatchStream {
impl ClientReadRecordBatchStream {
pub fn new(
+ endpoint: Endpoint,
table_ident: TableIdentifier,
response_stream: Streaming<remote_engine::ReadResponse>,
record_schema: RecordSchema,
) -> Self {
Self {
+ endpoint,
table_ident,
response_stream,
record_schema,
@@ -526,6 +561,7 @@ impl Stream for ClientReadRecordBatchStream {
// Check header.
if let Some(header) = response.header &&
!status_code::is_ok(header.code) {
return Poll::Ready(Some(Server {
+ endpoint: this.endpoint.clone(),
table_idents: vec![this.table_ident.clone()],
code: header.code,
msg: header.error,
diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs
index c88026f3..055f4083 100644
--- a/remote_engine_client/src/lib.rs
+++ b/remote_engine_client/src/lib.rs
@@ -54,6 +54,7 @@ use self::client::{Client, ClientReadRecordBatchStream};
pub mod error {
use generic_error::GenericError;
use macros::define_result;
+ use router::endpoint::Endpoint;
use snafu::{Backtrace, Snafu};
use table_engine::remote::model::TableIdentifier;
@@ -93,12 +94,14 @@ pub mod error {
},
#[snafu(display(
- "Failed to query from table in server, table_idents:{:?}, code:{},
msg:{}",
+ "Failed to query from table in server, table_idents:{:?},
endpoint:{}, code:{}, msg:{}",
table_idents,
+ endpoint.to_string(),
code,
msg
))]
Server {
+ endpoint: Endpoint,
table_idents: Vec<TableIdentifier>,
code: u32,
msg: String,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]