This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch service_discovery
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/service_discovery by this push:
new 6c424a2 implement service discovery
new 8cc0477 Merge pull request #51 from robberphex/zk
6c424a2 is described below
commit 6c424a2bd7cc09940159cc0ae1027869ee1fbaf3
Author: luyanbo <[email protected]>
AuthorDate: Fri Nov 4 13:16:02 2022 +0800
implement service discovery
---
Cargo.toml | 1 +
dubbo-build/src/client.rs | 19 +-
dubbo/Cargo.toml | 1 +
dubbo/src/cluster/directory.rs | 95 +++++++++
dubbo/src/cluster/mod.rs | 18 ++
dubbo/src/codegen.rs | 6 +
dubbo/src/invocation.rs | 33 +++
dubbo/src/lib.rs | 1 +
dubbo/src/registry/memory_registry.rs | 12 +-
dubbo/src/registry/mod.rs | 25 ++-
dubbo/src/triple/client/triple.rs | 101 ++++++++-
examples/echo/Cargo.toml | 3 +
examples/echo/src/protos/hello_echo.rs | 12 +-
examples/greeter/Cargo.toml | 3 +
examples/greeter/proto/greeter.proto | 2 +-
examples/greeter/src/greeter/client.rs | 107 +++-------
registry-zookeeper/Cargo.toml | 14 ++
registry-zookeeper/LICENSE | 202 ++++++++++++++++++
registry-zookeeper/src/lib.rs | 28 +++
registry-zookeeper/src/zookeeper_registry.rs | 307 +++++++++++++++++++++++++++
20 files changed, 895 insertions(+), 95 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 140c079..d3942a6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,6 +2,7 @@
members = [
"xds",
"registry",
+ "registry-zookeeper",
"metadata",
"common",
"config",
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 10fa446..01cd89c 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -100,6 +100,11 @@ pub fn generate<T: Service>(
}
}
+ pub fn with_directory(mut self, directory: Box<dyn Directory>)
-> Self {
+ self.inner = self.inner.with_directory(directory);
+ self
+ }
+
#methods
}
@@ -117,6 +122,12 @@ fn generate_methods<T: Service>(
let package = if emit_package { service.package() } else { "" };
for method in service.methods() {
+ let service_unique_name= format!(
+ "{}{}{}",
+ package,
+ if package.is_empty() { "" } else { "." },
+ service.identifier()
+ );
let path = format!(
"/{}{}{}/{}",
package,
@@ -128,7 +139,7 @@ fn generate_methods<T: Service>(
stream.extend(generate_doc_comments(method.comment()));
let method = match (method.client_streaming(),
method.server_streaming()) {
- (false, false) => generate_unary(&method, proto_path,
compile_well_known_types, path),
+ (false, false) => generate_unary(service_unique_name, &method,
proto_path, compile_well_known_types, path),
(false, true) => {
generate_server_streaming(&method, proto_path,
compile_well_known_types, path)
}
@@ -145,6 +156,7 @@ fn generate_methods<T: Service>(
}
fn generate_unary<T: Method>(
+ service_unique_name: String,
method: &T,
proto_path: &str,
compile_well_known_types: bool,
@@ -153,6 +165,7 @@ fn generate_unary<T: Method>(
let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
let ident = format_ident!("{}", method.name());
let (request, response) = method.request_response_name(proto_path,
compile_well_known_types);
+ let method_name = method.identifier();
quote! {
pub async fn #ident(
@@ -160,12 +173,16 @@ fn generate_unary<T: Method>(
request: Request<#request>,
) -> Result<Response<#response>, dubbo::status::Status> {
let codec = #codec_name::<#request, #response>::default();
+ let invocation = RpcInvocation::default()
+ .with_servie_unique_name(String::from(#service_unique_name))
+ .with_method_name(String::from(#method_name));
let path = http::uri::PathAndQuery::from_static(#path);
self.inner
.unary(
request,
codec,
path,
+ invocation,
)
.await
}
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 7b38a28..4161601 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -24,6 +24,7 @@ async-trait = "0.1.56"
tower-layer = "0.3"
bytes = "1.0"
pin-project = "1.0"
+rand = "0.8.5"
serde_json = "1.0.82"
serde = {version="1.0.138", features = ["derive"]}
futures = "0.3"
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
new file mode 100644
index 0000000..9f02386
--- /dev/null
+++ b/dubbo/src/cluster/directory.rs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::{Arc, RwLock};
+
+use crate::common::url::Url;
+use crate::invocation::{Invocation, RpcInvocation};
+use crate::registry::memory_registry::MemoryNotifyListener;
+use crate::registry::{BoxRegistry, RegistryWrapper};
+
+pub trait Directory: Debug + DirectoryClone {
+ fn list(&self, invocation: RpcInvocation) -> Vec<Url>;
+}
+
+pub trait DirectoryClone {
+ fn clone_box(&self) -> Box<dyn Directory>;
+}
+
+impl<T> DirectoryClone for T
+where
+ T: 'static + Directory + Clone,
+{
+ fn clone_box(&self) -> Box<dyn Directory> {
+ Box::new(self.clone())
+ }
+}
+
+impl Clone for Box<dyn Directory> {
+ fn clone(&self) -> Box<dyn Directory> {
+ self.clone_box()
+ }
+}
+
+#[derive(Debug)]
+pub struct RegistryDirectory {
+ registry: RegistryWrapper,
+ service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
+}
+
+impl RegistryDirectory {
+ pub fn new(registry: BoxRegistry) -> RegistryDirectory {
+ RegistryDirectory {
+ registry: RegistryWrapper {
+ registry: Some(registry),
+ },
+ service_instances: Arc::new(RwLock::new(HashMap::new())),
+ }
+ }
+}
+
+impl DirectoryClone for RegistryDirectory {
+ fn clone_box(&self) -> Box<dyn Directory> {
+ todo!()
+ }
+}
+
+impl Directory for RegistryDirectory {
+ fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+ let service_name = invocation.get_target_service_unique_name();
+
+ let url = Url::from_url(&format!(
+ "triple://{}:{}/{}",
+ "127.0.0.1", "8888", service_name
+ ))
+ .unwrap();
+
+ self.registry.registry.as_ref().expect("msg").subscribe(
+ url,
+ MemoryNotifyListener {
+ service_instances: Arc::clone(&self.service_instances),
+ },
+ ).expect("subscribe");
+
+ let map =
self.service_instances.read().expect("service_instances.read");
+ let binding = Vec::new();
+ let url_vec = map.get(&service_name).unwrap_or(&binding);
+ url_vec.to_vec()
+ }
+}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
new file mode 100644
index 0000000..84980ea
--- /dev/null
+++ b/dubbo/src/cluster/mod.rs
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod directory;
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 9ffce89..6a2e74d 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -24,6 +24,9 @@ pub use http_body::Body;
pub use hyper::Body as hyperBody;
pub use tower_service::Service;
+pub use super::registry::Registry;
+pub use super::registry::BoxRegistry;
+pub use super::registry::RegistryWrapper;
pub use super::invocation::{IntoStreamingRequest, Request, Response};
pub use super::protocol::triple::triple_invoker::TripleInvoker;
pub use super::protocol::Invoker;
@@ -36,6 +39,9 @@ pub use super::triple::server::service::{
};
pub use super::triple::server::TripleServer;
pub use super::{empty_body, BoxBody, BoxFuture, StdError};
+pub use super::invocation::RpcInvocation;
+pub use super::cluster::directory::Directory;
+pub use super::cluster::directory::RegistryDirectory;
pub use crate::filter::service::FilterService;
pub use crate::filter::Filter;
pub use crate::triple::client::connection::Connection;
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index 5a80e9a..85d971f 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -189,3 +189,36 @@ impl Metadata {
header
}
}
+
+pub trait Invocation {
+ fn get_target_service_unique_name(&self) -> String;
+ fn get_method_name(&self) -> String;
+}
+
+#[derive(Default)]
+pub struct RpcInvocation {
+ target_service_unique_name: String,
+ method_name: String,
+}
+
+impl RpcInvocation{
+ pub fn with_servie_unique_name(mut self, service_unique_name: String) ->
Self {
+ self.target_service_unique_name = service_unique_name;
+ self
+ }
+ pub fn with_method_name(mut self, method_name: String) -> Self {
+ self.method_name = method_name;
+ self
+ }
+}
+
+
+impl Invocation for RpcInvocation {
+ fn get_target_service_unique_name(&self) -> String {
+ self.target_service_unique_name.clone()
+ }
+
+ fn get_method_name(&self) -> String {
+ self.method_name.clone()
+ }
+}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 8ad9fa0..e685cba 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -16,6 +16,7 @@
*/
pub mod codegen;
+pub mod cluster;
pub mod common;
pub mod filter;
mod framework;
diff --git a/dubbo/src/registry/memory_registry.rs
b/dubbo/src/registry/memory_registry.rs
index 67df5c7..f1ff25f 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -20,6 +20,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
+use crate::common::url::Url;
+
use super::{NotifyListener, Registry};
// 从url中获取服务注册的元数据
@@ -100,11 +102,17 @@ impl Registry for MemoryRegistry {
}
}
-pub struct MemoryNotifyListener {}
+pub struct MemoryNotifyListener {
+ pub service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
+}
impl NotifyListener for MemoryNotifyListener {
fn notify(&self, event: super::ServiceEvent) {
- todo!()
+ let mut map=self.service_instances.write().expect("msg");
+ match event.action.as_str() {
+ "ADD"=> map.insert(event.key, event.service),
+ &_ => todo!()
+ };
}
fn notify_all(&self, event: super::ServiceEvent) {
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index cbae16b..8c56692 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -19,6 +19,8 @@
pub mod memory_registry;
pub mod protocol;
+use std::fmt::Debug;
+
use crate::common::url::Url;
pub trait Registry {
@@ -37,10 +39,27 @@ pub trait NotifyListener {
}
pub struct ServiceEvent {
- key: String,
- action: String,
- service: Url,
+ pub key: String,
+ pub action: String,
+ pub service: Vec<Url>,
}
pub type BoxRegistry =
Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener> +
Send + Sync>;
+
+#[derive(Default)]
+pub struct RegistryWrapper {
+ pub registry: Option<Box<dyn Registry<NotifyListener =
memory_registry::MemoryNotifyListener>>>,
+}
+
+impl Clone for RegistryWrapper {
+ fn clone(&self) -> Self {
+ Self { registry: None }
+ }
+}
+
+impl Debug for RegistryWrapper {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RegistryWrapper").finish()
+ }
+}
diff --git a/dubbo/src/triple/client/triple.rs
b/dubbo/src/triple/client/triple.rs
index 36c9ce9..27db0ad 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -18,13 +18,17 @@
use std::str::FromStr;
use futures_util::{future, stream, StreamExt, TryStreamExt};
+
use http::HeaderValue;
+use rand::prelude::SliceRandom;
use tower_service::Service;
use super::connection::Connection;
+use crate::codegen::{Directory, RpcInvocation};
use crate::filter::service::FilterService;
use crate::filter::Filter;
use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
+
use crate::triple::codec::Codec;
use crate::triple::compression::CompressionEncoding;
use crate::triple::decode::Decoding;
@@ -35,6 +39,7 @@ pub struct TripleClient<T> {
host: Option<http::Uri>,
inner: T,
send_compression_encoding: Option<CompressionEncoding>,
+ directory: Option<Box<dyn Directory>>,
}
impl TripleClient<Connection> {
@@ -51,6 +56,7 @@ impl TripleClient<Connection> {
host: uri.clone(),
inner: Connection::new().with_host(uri.unwrap()),
send_compression_encoding: Some(CompressionEncoding::Gzip),
+ directory: None,
}
}
}
@@ -61,6 +67,7 @@ impl<T> TripleClient<T> {
host,
inner,
send_compression_encoding: Some(CompressionEncoding::Gzip),
+ directory: None,
}
}
@@ -70,6 +77,14 @@ impl<T> TripleClient<T> {
{
TripleClient::new(FilterService::new(self.inner, filter), self.host)
}
+
+ /// host: http://0.0.0.0:8888
+ pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
+ TripleClient {
+ directory: Some(directory),
+ ..self
+ }
+ }
}
impl<T> TripleClient<T>
@@ -77,6 +92,78 @@ where
T: Service<http::Request<hyper::Body>, Response =
http::Response<crate::BoxBody>>,
T::Error: Into<crate::Error>,
{
+ fn new_map_request(
+ &self,
+ uri: http::Uri,
+ path: http::uri::PathAndQuery,
+ body: hyper::Body,
+ ) -> http::Request<hyper::Body> {
+ let mut parts = uri.into_parts();
+ parts.path_and_query = Some(path);
+
+ let uri = http::Uri::from_parts(parts).unwrap();
+ let mut req = hyper::Request::builder()
+ .version(http::Version::HTTP_2)
+ .uri(uri.clone())
+ .method("POST")
+ .body(body)
+ .unwrap();
+
+ *req.version_mut() = http::Version::HTTP_2;
+ req.headers_mut()
+ .insert("method", HeaderValue::from_static("POST"));
+ req.headers_mut().insert(
+ "scheme",
+ HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
+ );
+ req.headers_mut()
+ .insert("path", HeaderValue::from_str(uri.path()).unwrap());
+ req.headers_mut().insert(
+ "authority",
+ HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
+ );
+ req.headers_mut().insert(
+ "content-type",
+ HeaderValue::from_static("application/grpc+proto"),
+ );
+ req.headers_mut()
+ .insert("user-agent",
HeaderValue::from_static("dubbo-rust/0.1.0"));
+ req.headers_mut()
+ .insert("te", HeaderValue::from_static("trailers"));
+ req.headers_mut().insert(
+ "tri-service-version",
+ HeaderValue::from_static("dubbo-rust/0.1.0"),
+ );
+ req.headers_mut()
+ .insert("tri-service-group", HeaderValue::from_static("cluster"));
+ req.headers_mut().insert(
+ "tri-unit-info",
+ HeaderValue::from_static("dubbo-rust/0.1.0"),
+ );
+ if let Some(_encoding) = self.send_compression_encoding {
+ req.headers_mut()
+ .insert("grpc-encoding",
http::HeaderValue::from_static("gzip"));
+ }
+ req.headers_mut().insert(
+ "grpc-accept-encoding",
+ http::HeaderValue::from_static("gzip"),
+ );
+
+ // const (
+ // TripleContentType = "application/grpc+proto"
+ // TripleUserAgent = "grpc-go/1.35.0-dev"
+ // TripleServiceVersion = "tri-service-version"
+ // TripleAttachement = "tri-attachment"
+ // TripleServiceGroup = "tri-service-group"
+ // TripleRequestID = "tri-req-id"
+ // TripleTraceID = "tri-trace-traceid"
+ // TripleTraceRPCID = "tri-trace-rpcid"
+ // TripleTraceProtoBin = "tri-trace-proto-bin"
+ // TripleUnitInfo = "tri-unit-info"
+ // )
+ req
+ }
+
fn map_request(
&self,
path: http::uri::PathAndQuery,
@@ -114,7 +201,7 @@ where
);
req.headers_mut().insert(
"content-type",
- HeaderValue::from_static("application/grpc+json"),
+ HeaderValue::from_static("application/grpc+proto"),
);
req.headers_mut()
.insert("user-agent",
HeaderValue::from_static("dubbo-rust/0.1.0"));
@@ -159,6 +246,7 @@ where
req: Request<M1>,
mut codec: C,
path: http::uri::PathAndQuery,
+ invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
@@ -174,10 +262,15 @@ where
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
- let req = self.map_request(path, body);
+ let url_list = self.directory.as_ref().expect("msg").list(invocation);
+ let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+ let http_uri =
+ http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- let response = self
- .inner
+ let req = self.new_map_request(http_uri.clone(), path, body);
+
+ let mut conn = Connection::new().with_host(http_uri);
+ let response = conn
.call(req)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 9e794b0..f82ef80 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -23,8 +23,11 @@ prost = "0.10.4"
async-trait = "0.1.56"
tokio-stream = "0.1"
+hyper = { version = "0.14.19", features = ["full"]}
+
dubbo = {path = "../../dubbo", version = "0.2.0"}
dubbo-config = {path = "../../config", version = "0.2.0"}
+dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version =
"0.2.0"}
[build-dependencies]
dubbo-build = {path = "../../dubbo-build", version = "0.2.0"}
diff --git a/examples/echo/src/protos/hello_echo.rs
b/examples/echo/src/protos/hello_echo.rs
index ce8ce09..bb2389f 100644
--- a/examples/echo/src/protos/hello_echo.rs
+++ b/examples/echo/src/protos/hello_echo.rs
@@ -64,14 +64,10 @@ pub mod echo_client {
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
- let path = http::uri::PathAndQuery::from_static(
- "/grpc.examples.echo.Echo/UnaryEcho",
- );
- self.inner.unary(request, codec, path).await
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest,
super::EchoResponse>::default();
+ let path =
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
+ self.inner.unary(request, codec, path,
RpcInvocation::default()).await
}
/// ServerStreamingEcho is server side streaming.
pub async fn server_streaming_echo(
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index b698bc7..c4b20cc 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -22,9 +22,12 @@ prost-derive = {version = "0.10", optional = true}
prost = "0.10.4"
async-trait = "0.1.56"
tokio-stream = "0.1"
+tracing = "0.1"
+tracing-subscriber = "0.2.0"
dubbo = {path = "../../dubbo", version = "0.2.0"}
dubbo-config = {path = "../../config", version = "0.2.0"}
+dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version =
"0.2.0"}
[build-dependencies]
dubbo-build = {path = "../../dubbo-build", version = "0.2.0"}
diff --git a/examples/greeter/proto/greeter.proto
b/examples/greeter/proto/greeter.proto
index 0d8be79..a0c466a 100644
--- a/examples/greeter/proto/greeter.proto
+++ b/examples/greeter/proto/greeter.proto
@@ -29,7 +29,7 @@ message GreeterReply {
string message = 1;
}
-service Greeter{
+service Greeter {
// unary
rpc greet(GreeterRequest) returns (GreeterReply);
diff --git a/examples/greeter/src/greeter/client.rs
b/examples/greeter/src/greeter/client.rs
index 2004b6f..93d509a 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -19,14 +19,40 @@ pub mod protos {
#![allow(non_camel_case_types)]
include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
}
+use std::{str::FromStr, time::Duration, env};
-use dubbo::codegen::*;
-use futures_util::StreamExt;
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
+
+use dubbo::{cluster::directory::RegistryDirectory, codegen::*,
invocation::RpcInvocation};
+use http;
use protos::{greeter_client::GreeterClient, GreeterRequest};
+use tracing::Level;
+use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() {
- let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string());
+ // a builder for `FmtSubscriber`.
+ let subscriber = FmtSubscriber::builder()
+ // all spans/events with a level higher than TRACE (e.g, debug, info,
warn, etc.)
+ // will be written to stdout.
+ .with_max_level(Level::DEBUG)
+ // completes the builder.
+ .finish();
+
+ tracing::subscriber::set_global_default(subscriber).expect("setting
default subscriber failed");
+
+ let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+ Ok(val) => val,
+ Err(_) => "localhost:2181".to_string(),
+ };
+ let zkr = ZookeeperRegistry::new(&zk_connect_string);
+ let directory = RegistryDirectory::new(Box::new(zkr));
+
+ let http_uri = http::Uri::from_str(&"http://1.1.1.1:8888").unwrap();
+
+ let mut cli = GreeterClient::new(Connection::new().with_host(http_uri));
+ cli = cli.with_directory(Box::new(directory));
+ //let mut cli =
GreeterClient::connect("http://127.0.0.1:8888".to_string());
println!("# unary call");
let resp = cli
@@ -41,78 +67,7 @@ async fn main() {
let (_parts, body) = resp.into_parts();
println!("Response: {:?}", body);
- println!("# client stream");
- let data = vec![
- GreeterRequest {
- name: "msg1 from client streaming".to_string(),
- },
- GreeterRequest {
- name: "msg2 from client streaming".to_string(),
- },
- GreeterRequest {
- name: "msg3 from client streaming".to_string(),
- },
- ];
- let req = futures_util::stream::iter(data);
- let resp = cli.greet_client_stream(req).await;
- let client_streaming_resp = match resp {
- Ok(resp) => resp,
- Err(err) => return println!("{:?}", err),
- };
- let (_parts, resp_body) = client_streaming_resp.into_parts();
- println!("client streaming, Response: {:?}", resp_body);
-
- println!("# bi stream");
- let data = vec![
- GreeterRequest {
- name: "msg1 from client".to_string(),
- },
- GreeterRequest {
- name: "msg2 from client".to_string(),
- },
- GreeterRequest {
- name: "msg3 from client".to_string(),
- },
- ];
- let req = futures_util::stream::iter(data);
-
- let bidi_resp = cli.greet_stream(req).await.unwrap();
-
- let (parts, mut body) = bidi_resp.into_parts();
- println!("parts: {:?}", parts);
- while let Some(item) = body.next().await {
- match item {
- Ok(v) => {
- println!("reply: {:?}", v);
- }
- Err(err) => {
- println!("err: {:?}", err);
- }
- }
- }
- let trailer = body.trailer().await.unwrap();
- println!("trailer: {:?}", trailer);
-
- println!("# server stream");
- let resp = cli
- .greet_server_stream(Request::new(GreeterRequest {
- name: "server streaming req".to_string(),
- }))
- .await
- .unwrap();
-
- let (parts, mut body) = resp.into_parts();
- println!("parts: {:?}", parts);
- while let Some(item) = body.next().await {
- match item {
- Ok(v) => {
- println!("reply: {:?}", v);
- }
- Err(err) => {
- println!("err: {:?}", err);
- }
- }
+ loop {
+ tokio::time::sleep(Duration::from_millis(100)).await;
}
- let trailer = body.trailer().await.unwrap();
- println!("trailer: {:?}", trailer);
}
diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml
new file mode 100644
index 0000000..cf040fa
--- /dev/null
+++ b/registry-zookeeper/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "dubbo-registry-zookeeper"
+version = "0.2.0"
+edition = "2021"
+license = "Apache-2.0"
+
+# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+zookeeper = "0.6.1"
+dubbo = {path = "../dubbo/", version = "0.2.0"}
+serde_json = "1.0"
+serde = {version = "1.0.145",features = ["derive"]}
+tracing = "0.1"
diff --git a/registry-zookeeper/LICENSE b/registry-zookeeper/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/registry-zookeeper/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/registry-zookeeper/src/lib.rs b/registry-zookeeper/src/lib.rs
new file mode 100644
index 0000000..7481129
--- /dev/null
+++ b/registry-zookeeper/src/lib.rs
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+pub mod zookeeper_registry;
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn it_works() {
+ let result = 2 + 2;
+ assert_eq!(result, 4);
+ }
+}
diff --git a/registry-zookeeper/src/zookeeper_registry.rs
b/registry-zookeeper/src/zookeeper_registry.rs
new file mode 100644
index 0000000..6c2ef91
--- /dev/null
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#![allow(unused_variables, dead_code, missing_docs)]
+
+use dubbo::common::url::Url;
+use dubbo::registry::memory_registry::MemoryNotifyListener;
+use dubbo::registry::NotifyListener;
+use dubbo::registry::Registry;
+use dubbo::registry::ServiceEvent;
+use dubbo::StdError;
+use serde::{Deserialize, Serialize};
+use tracing::info;
+use zookeeper::Acl;
+use zookeeper::CreateMode;
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::sync::RwLock;
+use std::time::Duration;
+use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+
+// 从url中获取服务注册的元数据
+/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+/// dubboPath = fmt.Sprintf("/%s/%s/%s",
r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c),
common.DubboNodes[common.PROVIDER])
+
+pub const REGISTRY_GROUP_KEY: &str = "registry.group";
+
+struct LoggingWatcher;
+impl Watcher for LoggingWatcher {
+ fn handle(&self, e: WatchedEvent) {
+ println!("{:?}", e)
+ }
+}
+
+//#[derive(Debug)]
+pub struct ZookeeperRegistry {
+ root_path: String,
+ zk_client: Arc<ZooKeeper>,
+
+ listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as
Registry>::NotifyListener>>>,
+}
+
+pub struct MyNotifyListener {}
+
+impl NotifyListener for MyNotifyListener {
+ fn notify(&self, event: dubbo::registry::ServiceEvent) {
+ todo!()
+ }
+
+ fn notify_all(&self, event: dubbo::registry::ServiceEvent) {
+ todo!()
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ZkServiceInstance {
+ name: String,
+ address: String,
+ port: i32,
+}
+
+impl ZkServiceInstance {
+ pub fn get_service_name(&self) -> &str {
+ self.name.as_str()
+ }
+
+ pub fn get_host(&self) -> &str {
+ self.address.as_str()
+ }
+
+ pub fn get_port(&self) -> i32 {
+ self.port
+ }
+}
+
+impl ZookeeperRegistry {
+ pub fn new(connect_string: &str) -> ZookeeperRegistry {
+ let zk_client =
+ ZooKeeper::connect(connect_string, Duration::from_secs(15),
LoggingWatcher).unwrap();
+ ZookeeperRegistry {
+ root_path: "/services".to_string(),
+ zk_client: Arc::new(zk_client),
+
+ listeners: RwLock::new(HashMap::new()),
+ }
+ }
+
+ fn create_listener(
+ &self,
+ path: String,
+ service_name: String,
+ listener: Arc<<ZookeeperRegistry as Registry>::NotifyListener>,
+ ) -> ServiceInstancesChangedListener {
+ let mut service_names = HashSet::new();
+ service_names.insert(service_name.clone());
+ return ServiceInstancesChangedListener {
+ zk_client: Arc::clone(&self.zk_client),
+ path: path,
+
+ service_name: service_name.clone(),
+ listener: listener,
+ };
+ }
+
+ fn get_app_name(&self, service_name: String) -> String {
+ let res = self
+ .zk_client
+ .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
+
+ let x = res.unwrap().0;
+ let s = match std::str::from_utf8(&x) {
+ Ok(v) => v,
+ Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
+ };
+ s.to_string()
+ }
+}
+
+impl Registry for ZookeeperRegistry {
+ type NotifyListener = MemoryNotifyListener;
+
+ fn register(&mut self, url: Url) -> Result<(), StdError> {
+ todo!();
+ }
+
+ fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+ todo!();
+ }
+
+ fn subscribe(&self, url: Url, listener: Self::NotifyListener) ->
Result<(), StdError> {
+ let binding = url.get_service_name();
+ let service_name = binding.get(0).unwrap();
+ let app_name = self.get_app_name(service_name.clone());
+ let path = self.root_path.clone() + "/" + &app_name;
+ if self.listeners.read().unwrap().get(service_name).is_some() {
+ return Ok(());
+ }
+
+ let arc_listener = Arc::new(listener);
+ self.listeners
+ .write()
+ .unwrap()
+ .insert(service_name.to_string(), Arc::clone(&arc_listener));
+
+ let zk_listener = self.create_listener(
+ path.clone(),
+ service_name.to_string(),
+ Arc::clone(&arc_listener),
+ );
+
+ let res = self.zk_client.get_children_w(&path, zk_listener);
+ let result: Vec<Url> = res
+ .unwrap()
+ .iter()
+ .map(|node_key| {
+ let zk_res = self.zk_client.get_data(
+ &(self.root_path.clone() + "/" + &app_name + "/" +
&node_key),
+ false,
+ );
+ let vec_u8 = zk_res.unwrap().0;
+ let sstr = std::str::from_utf8(&vec_u8).unwrap();
+ let instance: ZkServiceInstance =
serde_json::from_str(sstr).unwrap();
+ let url = Url::from_url(&format!(
+ "triple://{}:{}/{}",
+ instance.get_host(),
+ instance.get_port(),
+ service_name
+ ))
+ .unwrap();
+ url
+ })
+ .collect();
+
+ info!("notifing {}->{:?}", service_name, result);
+ arc_listener.notify(ServiceEvent {
+ key: service_name.to_string(),
+ action: String::from("ADD"),
+ service: result,
+ });
+ Ok(())
+ }
+
+ fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) ->
Result<(), StdError> {
+ todo!()
+ }
+}
+
+pub struct ServiceInstancesChangedListener {
+ zk_client: Arc<ZooKeeper>,
+ path: String,
+
+ service_name: String,
+ listener: Arc<MemoryNotifyListener>,
+}
+
+impl Watcher for ServiceInstancesChangedListener {
+ fn handle(&self, event: WatchedEvent) {
+ if let (WatchedEventType::NodeChildrenChanged, Some(path)) =
(event.event_type, event.path)
+ {
+ let event_path = path.clone();
+ let dirs = self
+ .zk_client
+ .get_children(&event_path.clone(), false)
+ .expect("msg");
+ let result: Vec<Url> = dirs
+ .iter()
+ .map(|node_key| {
+ let zk_res = self
+ .zk_client
+ .get_data(&(event_path.clone() + "/" + node_key),
false);
+ let vec_u8 = zk_res.unwrap().0;
+ let sstr = std::str::from_utf8(&vec_u8).unwrap();
+ let instance: ZkServiceInstance =
serde_json::from_str(sstr).unwrap();
+ let url = Url::from_url(&format!(
+ "triple://{}:{}/{}",
+ instance.get_host(),
+ instance.get_port(),
+ self.service_name
+ ))
+ .unwrap();
+ url
+ })
+ .collect();
+
+ let res = self.zk_client.get_children_w(
+ &path,
+ ServiceInstancesChangedListener {
+ zk_client: Arc::clone(&self.zk_client),
+ path: path.clone(),
+
+ service_name: self.service_name.clone(),
+ listener: Arc::clone(&self.listener),
+ },
+ );
+
+ info!("notifing {}->{:?}", self.service_name, result);
+ self.listener.notify(ServiceEvent {
+ key: self.service_name.clone(),
+ action: String::from("ADD"),
+ service: result,
+ });
+ }
+ }
+}
+
+impl NotifyListener for ServiceInstancesChangedListener {
+ fn notify(&self, event: ServiceEvent) {
+ todo!()
+ }
+
+ fn notify_all(&self, event: ServiceEvent) {
+ todo!()
+ }
+}
+
+#[test]
+fn it_works() {
+ let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181";
+ let zk_client =
+ ZooKeeper::connect(connect_string, Duration::from_secs(15),
LoggingWatcher).unwrap();
+ let watcher = Arc::new(Some(TestZkWatcher {
+ watcher: Arc::new(None),
+ }));
+ watcher.as_ref().expect("").watcher = Arc::clone(&watcher);
+ let x = watcher.as_ref().expect("");
+ zk_client.get_children_w("/test", x);
+ zk_client.delete("/test/a", None);
+ zk_client.delete("/test/b", None);
+ let zk_res = zk_client.create(
+ "/test/a",
+ vec![1, 3],
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ let zk_res = zk_client.create(
+ "/test/b",
+ vec![1, 3],
+ Acl::open_unsafe().clone(),
+ CreateMode::Ephemeral,
+ );
+ zk_client.close();
+}
+
+struct TestZkWatcher {
+ pub watcher: Arc<Option<TestZkWatcher>>,
+}
+
+impl Watcher for TestZkWatcher {
+ fn handle(&self, event: WatchedEvent) {
+ println!("event: {:?}", event);
+ }
+}