This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch service_discovery
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/service_discovery by this push:
     new 6c424a2  implement service discovery
     new 8cc0477  Merge pull request #51 from robberphex/zk
6c424a2 is described below

commit 6c424a2bd7cc09940159cc0ae1027869ee1fbaf3
Author: luyanbo <[email protected]>
AuthorDate: Fri Nov 4 13:16:02 2022 +0800

    implement service discovery
---
 Cargo.toml                                   |   1 +
 dubbo-build/src/client.rs                    |  19 +-
 dubbo/Cargo.toml                             |   1 +
 dubbo/src/cluster/directory.rs               |  95 +++++++++
 dubbo/src/cluster/mod.rs                     |  18 ++
 dubbo/src/codegen.rs                         |   6 +
 dubbo/src/invocation.rs                      |  33 +++
 dubbo/src/lib.rs                             |   1 +
 dubbo/src/registry/memory_registry.rs        |  12 +-
 dubbo/src/registry/mod.rs                    |  25 ++-
 dubbo/src/triple/client/triple.rs            | 101 ++++++++-
 examples/echo/Cargo.toml                     |   3 +
 examples/echo/src/protos/hello_echo.rs       |  12 +-
 examples/greeter/Cargo.toml                  |   3 +
 examples/greeter/proto/greeter.proto         |   2 +-
 examples/greeter/src/greeter/client.rs       | 107 +++-------
 registry-zookeeper/Cargo.toml                |  14 ++
 registry-zookeeper/LICENSE                   | 202 ++++++++++++++++++
 registry-zookeeper/src/lib.rs                |  28 +++
 registry-zookeeper/src/zookeeper_registry.rs | 307 +++++++++++++++++++++++++++
 20 files changed, 895 insertions(+), 95 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 140c079..d3942a6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -2,6 +2,7 @@
 members = [
   "xds",
   "registry",
+  "registry-zookeeper",
   "metadata",
   "common",
   "config",
diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 10fa446..01cd89c 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -100,6 +100,11 @@ pub fn generate<T: Service>(
                     }
                 }
 
+                pub fn with_directory(mut self, directory: Box<dyn Directory>) 
-> Self {
+                    self.inner = self.inner.with_directory(directory);
+                    self
+                }
+
                 #methods
 
             }
@@ -117,6 +122,12 @@ fn generate_methods<T: Service>(
     let package = if emit_package { service.package() } else { "" };
 
     for method in service.methods() {
+        let service_unique_name= format!(
+            "{}{}{}",
+            package,
+            if package.is_empty() { "" } else { "." },
+            service.identifier()
+        );
         let path = format!(
             "/{}{}{}/{}",
             package,
@@ -128,7 +139,7 @@ fn generate_methods<T: Service>(
         stream.extend(generate_doc_comments(method.comment()));
 
         let method = match (method.client_streaming(), 
method.server_streaming()) {
-            (false, false) => generate_unary(&method, proto_path, 
compile_well_known_types, path),
+            (false, false) => generate_unary(service_unique_name, &method, 
proto_path, compile_well_known_types, path),
             (false, true) => {
                 generate_server_streaming(&method, proto_path, 
compile_well_known_types, path)
             }
@@ -145,6 +156,7 @@ fn generate_methods<T: Service>(
 }
 
 fn generate_unary<T: Method>(
+    service_unique_name: String,
     method: &T,
     proto_path: &str,
     compile_well_known_types: bool,
@@ -153,6 +165,7 @@ fn generate_unary<T: Method>(
     let codec_name = syn::parse_str::<syn::Path>(CODEC_PATH).unwrap();
     let ident = format_ident!("{}", method.name());
     let (request, response) = method.request_response_name(proto_path, 
compile_well_known_types);
+    let method_name = method.identifier();
 
     quote! {
         pub async fn #ident(
@@ -160,12 +173,16 @@ fn generate_unary<T: Method>(
             request: Request<#request>,
         ) -> Result<Response<#response>, dubbo::status::Status> {
            let codec = #codec_name::<#request, #response>::default();
+           let invocation = RpcInvocation::default()
+            .with_servie_unique_name(String::from(#service_unique_name))
+            .with_method_name(String::from(#method_name));
            let path = http::uri::PathAndQuery::from_static(#path);
            self.inner
             .unary(
                 request,
                 codec,
                 path,
+                invocation,
             )
             .await
         }
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 7b38a28..4161601 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -24,6 +24,7 @@ async-trait = "0.1.56"
 tower-layer = "0.3"
 bytes = "1.0"
 pin-project = "1.0"
+rand = "0.8.5"
 serde_json = "1.0.82"
 serde = {version="1.0.138", features = ["derive"]}
 futures = "0.3"
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
new file mode 100644
index 0000000..9f02386
--- /dev/null
+++ b/dubbo/src/cluster/directory.rs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::{Arc, RwLock};
+
+use crate::common::url::Url;
+use crate::invocation::{Invocation, RpcInvocation};
+use crate::registry::memory_registry::MemoryNotifyListener;
+use crate::registry::{BoxRegistry, RegistryWrapper};
+
+pub trait Directory: Debug + DirectoryClone {
+    fn list(&self, invocation: RpcInvocation) -> Vec<Url>;
+}
+
+pub trait DirectoryClone {
+    fn clone_box(&self) -> Box<dyn Directory>;
+}
+
+impl<T> DirectoryClone for T
+where
+    T: 'static + Directory + Clone,
+{
+    fn clone_box(&self) -> Box<dyn Directory> {
+        Box::new(self.clone())
+    }
+}
+
+impl Clone for Box<dyn Directory> {
+    fn clone(&self) -> Box<dyn Directory> {
+        self.clone_box()
+    }
+}
+
+#[derive(Debug)]
+pub struct RegistryDirectory {
+    registry: RegistryWrapper,
+    service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
+}
+
+impl RegistryDirectory {
+    pub fn new(registry: BoxRegistry) -> RegistryDirectory {
+        RegistryDirectory {
+            registry: RegistryWrapper {
+                registry: Some(registry),
+            },
+            service_instances: Arc::new(RwLock::new(HashMap::new())),
+        }
+    }
+}
+
+impl DirectoryClone for RegistryDirectory {
+    fn clone_box(&self) -> Box<dyn Directory> {
+        todo!()
+    }
+}
+
+impl Directory for RegistryDirectory {
+    fn list(&self, invocation: RpcInvocation) -> Vec<Url> {
+        let service_name = invocation.get_target_service_unique_name();
+
+        let url = Url::from_url(&format!(
+            "triple://{}:{}/{}",
+            "127.0.0.1", "8888", service_name
+        ))
+        .unwrap();
+
+        self.registry.registry.as_ref().expect("msg").subscribe(
+            url,
+            MemoryNotifyListener {
+                service_instances: Arc::clone(&self.service_instances),
+            },
+        ).expect("subscribe");
+
+        let map = 
self.service_instances.read().expect("service_instances.read");
+        let binding = Vec::new();
+        let url_vec = map.get(&service_name).unwrap_or(&binding);
+        url_vec.to_vec()
+    }
+}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
new file mode 100644
index 0000000..84980ea
--- /dev/null
+++ b/dubbo/src/cluster/mod.rs
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod directory;
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 9ffce89..6a2e74d 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -24,6 +24,9 @@ pub use http_body::Body;
 pub use hyper::Body as hyperBody;
 pub use tower_service::Service;
 
+pub use super::registry::Registry;
+pub use super::registry::BoxRegistry;
+pub use super::registry::RegistryWrapper;
 pub use super::invocation::{IntoStreamingRequest, Request, Response};
 pub use super::protocol::triple::triple_invoker::TripleInvoker;
 pub use super::protocol::Invoker;
@@ -36,6 +39,9 @@ pub use super::triple::server::service::{
 };
 pub use super::triple::server::TripleServer;
 pub use super::{empty_body, BoxBody, BoxFuture, StdError};
+pub use super::invocation::RpcInvocation;
+pub use super::cluster::directory::Directory;
+pub use super::cluster::directory::RegistryDirectory;
 pub use crate::filter::service::FilterService;
 pub use crate::filter::Filter;
 pub use crate::triple::client::connection::Connection;
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index 5a80e9a..85d971f 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -189,3 +189,36 @@ impl Metadata {
         header
     }
 }
+
+pub trait Invocation {
+    fn get_target_service_unique_name(&self) -> String;
+    fn get_method_name(&self) -> String;
+}
+
+#[derive(Default)]
+pub struct RpcInvocation {
+    target_service_unique_name: String,
+    method_name: String,
+}
+
+impl RpcInvocation{
+    pub fn with_servie_unique_name(mut self, service_unique_name: String) -> 
Self {
+        self.target_service_unique_name = service_unique_name;
+        self
+    }
+    pub fn with_method_name(mut self, method_name: String) -> Self {
+        self.method_name = method_name;
+        self
+    }
+}
+
+
+impl Invocation for RpcInvocation {
+    fn get_target_service_unique_name(&self) -> String {
+        self.target_service_unique_name.clone()
+    }
+
+    fn get_method_name(&self) -> String {
+        self.method_name.clone()
+    }
+}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 8ad9fa0..e685cba 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -16,6 +16,7 @@
  */
 
 pub mod codegen;
+pub mod cluster;
 pub mod common;
 pub mod filter;
 mod framework;
diff --git a/dubbo/src/registry/memory_registry.rs 
b/dubbo/src/registry/memory_registry.rs
index 67df5c7..f1ff25f 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -20,6 +20,8 @@ use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::RwLock;
 
+use crate::common::url::Url;
+
 use super::{NotifyListener, Registry};
 
 // 从url中获取服务注册的元数据
@@ -100,11 +102,17 @@ impl Registry for MemoryRegistry {
     }
 }
 
-pub struct MemoryNotifyListener {}
+pub struct MemoryNotifyListener {
+    pub service_instances: Arc<RwLock<HashMap<String, Vec<Url>>>>,
+}
 
 impl NotifyListener for MemoryNotifyListener {
     fn notify(&self, event: super::ServiceEvent) {
-        todo!()
+        let mut map=self.service_instances.write().expect("msg");
+        match event.action.as_str() {
+            "ADD"=> map.insert(event.key, event.service),
+            &_ => todo!()
+        };
     }
 
     fn notify_all(&self, event: super::ServiceEvent) {
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index cbae16b..8c56692 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -19,6 +19,8 @@
 pub mod memory_registry;
 pub mod protocol;
 
+use std::fmt::Debug;
+
 use crate::common::url::Url;
 
 pub trait Registry {
@@ -37,10 +39,27 @@ pub trait NotifyListener {
 }
 
 pub struct ServiceEvent {
-    key: String,
-    action: String,
-    service: Url,
+    pub key: String,
+    pub action: String,
+    pub service: Vec<Url>,
 }
 
 pub type BoxRegistry =
     Box<dyn Registry<NotifyListener = memory_registry::MemoryNotifyListener> + 
Send + Sync>;
+
+#[derive(Default)]
+pub struct RegistryWrapper {
+    pub registry: Option<Box<dyn Registry<NotifyListener = 
memory_registry::MemoryNotifyListener>>>,
+}
+
+impl Clone for RegistryWrapper {
+    fn clone(&self) -> Self {
+        Self { registry: None }
+    }
+}
+
+impl Debug for RegistryWrapper {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RegistryWrapper").finish()
+    }
+}
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index 36c9ce9..27db0ad 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -18,13 +18,17 @@
 use std::str::FromStr;
 
 use futures_util::{future, stream, StreamExt, TryStreamExt};
+
 use http::HeaderValue;
+use rand::prelude::SliceRandom;
 use tower_service::Service;
 
 use super::connection::Connection;
+use crate::codegen::{Directory, RpcInvocation};
 use crate::filter::service::FilterService;
 use crate::filter::Filter;
 use crate::invocation::{IntoStreamingRequest, Metadata, Request, Response};
+
 use crate::triple::codec::Codec;
 use crate::triple::compression::CompressionEncoding;
 use crate::triple::decode::Decoding;
@@ -35,6 +39,7 @@ pub struct TripleClient<T> {
     host: Option<http::Uri>,
     inner: T,
     send_compression_encoding: Option<CompressionEncoding>,
+    directory: Option<Box<dyn Directory>>,
 }
 
 impl TripleClient<Connection> {
@@ -51,6 +56,7 @@ impl TripleClient<Connection> {
             host: uri.clone(),
             inner: Connection::new().with_host(uri.unwrap()),
             send_compression_encoding: Some(CompressionEncoding::Gzip),
+            directory: None,
         }
     }
 }
@@ -61,6 +67,7 @@ impl<T> TripleClient<T> {
             host,
             inner,
             send_compression_encoding: Some(CompressionEncoding::Gzip),
+            directory: None,
         }
     }
 
@@ -70,6 +77,14 @@ impl<T> TripleClient<T> {
     {
         TripleClient::new(FilterService::new(self.inner, filter), self.host)
     }
+
+    /// host: http://0.0.0.0:8888
+    pub fn with_directory(self, directory: Box<dyn Directory>) -> Self {
+        TripleClient {
+            directory: Some(directory),
+            ..self
+        }
+    }
 }
 
 impl<T> TripleClient<T>
@@ -77,6 +92,78 @@ where
     T: Service<http::Request<hyper::Body>, Response = 
http::Response<crate::BoxBody>>,
     T::Error: Into<crate::Error>,
 {
+    fn new_map_request(
+        &self,
+        uri: http::Uri,
+        path: http::uri::PathAndQuery,
+        body: hyper::Body,
+    ) -> http::Request<hyper::Body> {
+        let mut parts = uri.into_parts();
+        parts.path_and_query = Some(path);
+
+        let uri = http::Uri::from_parts(parts).unwrap();
+        let mut req = hyper::Request::builder()
+            .version(http::Version::HTTP_2)
+            .uri(uri.clone())
+            .method("POST")
+            .body(body)
+            .unwrap();
+
+        *req.version_mut() = http::Version::HTTP_2;
+        req.headers_mut()
+            .insert("method", HeaderValue::from_static("POST"));
+        req.headers_mut().insert(
+            "scheme",
+            HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
+        );
+        req.headers_mut()
+            .insert("path", HeaderValue::from_str(uri.path()).unwrap());
+        req.headers_mut().insert(
+            "authority",
+            HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
+        );
+        req.headers_mut().insert(
+            "content-type",
+            HeaderValue::from_static("application/grpc+proto"),
+        );
+        req.headers_mut()
+            .insert("user-agent", 
HeaderValue::from_static("dubbo-rust/0.1.0"));
+        req.headers_mut()
+            .insert("te", HeaderValue::from_static("trailers"));
+        req.headers_mut().insert(
+            "tri-service-version",
+            HeaderValue::from_static("dubbo-rust/0.1.0"),
+        );
+        req.headers_mut()
+            .insert("tri-service-group", HeaderValue::from_static("cluster"));
+        req.headers_mut().insert(
+            "tri-unit-info",
+            HeaderValue::from_static("dubbo-rust/0.1.0"),
+        );
+        if let Some(_encoding) = self.send_compression_encoding {
+            req.headers_mut()
+                .insert("grpc-encoding", 
http::HeaderValue::from_static("gzip"));
+        }
+        req.headers_mut().insert(
+            "grpc-accept-encoding",
+            http::HeaderValue::from_static("gzip"),
+        );
+
+        // const (
+        //     TripleContentType    = "application/grpc+proto"
+        //     TripleUserAgent      = "grpc-go/1.35.0-dev"
+        //     TripleServiceVersion = "tri-service-version"
+        //     TripleAttachement    = "tri-attachment"
+        //     TripleServiceGroup   = "tri-service-group"
+        //     TripleRequestID      = "tri-req-id"
+        //     TripleTraceID        = "tri-trace-traceid"
+        //     TripleTraceRPCID     = "tri-trace-rpcid"
+        //     TripleTraceProtoBin  = "tri-trace-proto-bin"
+        //     TripleUnitInfo       = "tri-unit-info"
+        // )
+        req
+    }
+
     fn map_request(
         &self,
         path: http::uri::PathAndQuery,
@@ -114,7 +201,7 @@ where
         );
         req.headers_mut().insert(
             "content-type",
-            HeaderValue::from_static("application/grpc+json"),
+            HeaderValue::from_static("application/grpc+proto"),
         );
         req.headers_mut()
             .insert("user-agent", 
HeaderValue::from_static("dubbo-rust/0.1.0"));
@@ -159,6 +246,7 @@ where
         req: Request<M1>,
         mut codec: C,
         path: http::uri::PathAndQuery,
+        invocation: RpcInvocation,
     ) -> Result<Response<M2>, crate::status::Status>
     where
         C: Codec<Encode = M1, Decode = M2>,
@@ -174,10 +262,15 @@ where
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
 
-        let req = self.map_request(path, body);
+        let url_list = self.directory.as_ref().expect("msg").list(invocation);
+        let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
+        let http_uri =
+            http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
 
-        let response = self
-            .inner
+        let req = self.new_map_request(http_uri.clone(), path, body);
+
+        let mut conn = Connection::new().with_host(http_uri);
+        let response = conn
             .call(req)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
diff --git a/examples/echo/Cargo.toml b/examples/echo/Cargo.toml
index 9e794b0..f82ef80 100644
--- a/examples/echo/Cargo.toml
+++ b/examples/echo/Cargo.toml
@@ -23,8 +23,11 @@ prost = "0.10.4"
 async-trait = "0.1.56"
 tokio-stream = "0.1"
 
+hyper = { version = "0.14.19", features = ["full"]}
+
 dubbo = {path = "../../dubbo", version = "0.2.0"}
 dubbo-config = {path = "../../config", version = "0.2.0"}
+dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = 
"0.2.0"}
 
 [build-dependencies]
 dubbo-build = {path = "../../dubbo-build", version = "0.2.0"}
diff --git a/examples/echo/src/protos/hello_echo.rs 
b/examples/echo/src/protos/hello_echo.rs
index ce8ce09..bb2389f 100644
--- a/examples/echo/src/protos/hello_echo.rs
+++ b/examples/echo/src/protos/hello_echo.rs
@@ -64,14 +64,10 @@ pub mod echo_client {
             &mut self,
             request: Request<super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
-            let path = http::uri::PathAndQuery::from_static(
-                "/grpc.examples.echo.Echo/UnaryEcho",
-            );
-            self.inner.unary(request, codec, path).await
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, 
super::EchoResponse>::default();
+            let path = 
http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
+            self.inner.unary(request, codec, path, 
RpcInvocation::default()).await
         }
         /// ServerStreamingEcho is server side streaming.
         pub async fn server_streaming_echo(
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index b698bc7..c4b20cc 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -22,9 +22,12 @@ prost-derive = {version = "0.10", optional = true}
 prost = "0.10.4"
 async-trait = "0.1.56"
 tokio-stream = "0.1"
+tracing = "0.1"
+tracing-subscriber = "0.2.0"
 
 dubbo = {path = "../../dubbo", version = "0.2.0"}
 dubbo-config = {path = "../../config", version = "0.2.0"}
+dubbo-registry-zookeeper = {path = "../../registry-zookeeper", version = 
"0.2.0"}
 
 [build-dependencies]
 dubbo-build = {path = "../../dubbo-build", version = "0.2.0"}
diff --git a/examples/greeter/proto/greeter.proto 
b/examples/greeter/proto/greeter.proto
index 0d8be79..a0c466a 100644
--- a/examples/greeter/proto/greeter.proto
+++ b/examples/greeter/proto/greeter.proto
@@ -29,7 +29,7 @@ message GreeterReply {
   string message = 1;
 }
 
-service Greeter{
+service Greeter {
 
   // unary
   rpc greet(GreeterRequest) returns (GreeterReply);
diff --git a/examples/greeter/src/greeter/client.rs 
b/examples/greeter/src/greeter/client.rs
index 2004b6f..93d509a 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -19,14 +19,40 @@ pub mod protos {
     #![allow(non_camel_case_types)]
     include!(concat!(env!("OUT_DIR"), "/org.apache.dubbo.sample.tri.rs"));
 }
+use std::{str::FromStr, time::Duration, env};
 
-use dubbo::codegen::*;
-use futures_util::StreamExt;
+use dubbo_registry_zookeeper::zookeeper_registry::ZookeeperRegistry;
+
+use dubbo::{cluster::directory::RegistryDirectory, codegen::*, 
invocation::RpcInvocation};
+use http;
 use protos::{greeter_client::GreeterClient, GreeterRequest};
+use tracing::Level;
+use tracing_subscriber::FmtSubscriber;
 
 #[tokio::main]
 async fn main() {
-    let mut cli = GreeterClient::connect("http://127.0.0.1:8888".to_string());
+    // a builder for `FmtSubscriber`.
+    let subscriber = FmtSubscriber::builder()
+        // all spans/events with a level higher than TRACE (e.g, debug, info, 
warn, etc.)
+        // will be written to stdout.
+        .with_max_level(Level::DEBUG)
+        // completes the builder.
+        .finish();
+
+    tracing::subscriber::set_global_default(subscriber).expect("setting 
default subscriber failed");
+
+    let zk_connect_string = match env::var("ZOOKEEPER_SERVERS") {
+        Ok(val) => val,
+        Err(_) => "localhost:2181".to_string(),
+    };
+    let zkr = ZookeeperRegistry::new(&zk_connect_string);
+    let directory = RegistryDirectory::new(Box::new(zkr));
+
+    let http_uri = http::Uri::from_str(&"http://1.1.1.1:8888";).unwrap();
+
+    let mut cli = GreeterClient::new(Connection::new().with_host(http_uri));
+    cli = cli.with_directory(Box::new(directory));
+    //let mut cli = 
GreeterClient::connect("http://127.0.0.1:8888".to_string());
 
     println!("# unary call");
     let resp = cli
@@ -41,78 +67,7 @@ async fn main() {
     let (_parts, body) = resp.into_parts();
     println!("Response: {:?}", body);
 
-    println!("# client stream");
-    let data = vec![
-        GreeterRequest {
-            name: "msg1 from client streaming".to_string(),
-        },
-        GreeterRequest {
-            name: "msg2 from client streaming".to_string(),
-        },
-        GreeterRequest {
-            name: "msg3 from client streaming".to_string(),
-        },
-    ];
-    let req = futures_util::stream::iter(data);
-    let resp = cli.greet_client_stream(req).await;
-    let client_streaming_resp = match resp {
-        Ok(resp) => resp,
-        Err(err) => return println!("{:?}", err),
-    };
-    let (_parts, resp_body) = client_streaming_resp.into_parts();
-    println!("client streaming, Response: {:?}", resp_body);
-
-    println!("# bi stream");
-    let data = vec![
-        GreeterRequest {
-            name: "msg1 from client".to_string(),
-        },
-        GreeterRequest {
-            name: "msg2 from client".to_string(),
-        },
-        GreeterRequest {
-            name: "msg3 from client".to_string(),
-        },
-    ];
-    let req = futures_util::stream::iter(data);
-
-    let bidi_resp = cli.greet_stream(req).await.unwrap();
-
-    let (parts, mut body) = bidi_resp.into_parts();
-    println!("parts: {:?}", parts);
-    while let Some(item) = body.next().await {
-        match item {
-            Ok(v) => {
-                println!("reply: {:?}", v);
-            }
-            Err(err) => {
-                println!("err: {:?}", err);
-            }
-        }
-    }
-    let trailer = body.trailer().await.unwrap();
-    println!("trailer: {:?}", trailer);
-
-    println!("# server stream");
-    let resp = cli
-        .greet_server_stream(Request::new(GreeterRequest {
-            name: "server streaming req".to_string(),
-        }))
-        .await
-        .unwrap();
-
-    let (parts, mut body) = resp.into_parts();
-    println!("parts: {:?}", parts);
-    while let Some(item) = body.next().await {
-        match item {
-            Ok(v) => {
-                println!("reply: {:?}", v);
-            }
-            Err(err) => {
-                println!("err: {:?}", err);
-            }
-        }
+    loop {
+        tokio::time::sleep(Duration::from_millis(100)).await;
     }
-    let trailer = body.trailer().await.unwrap();
-    println!("trailer: {:?}", trailer);
 }
diff --git a/registry-zookeeper/Cargo.toml b/registry-zookeeper/Cargo.toml
new file mode 100644
index 0000000..cf040fa
--- /dev/null
+++ b/registry-zookeeper/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "dubbo-registry-zookeeper"
+version = "0.2.0"
+edition = "2021"
+license = "Apache-2.0"
+
+# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+zookeeper = "0.6.1"
+dubbo = {path = "../dubbo/", version = "0.2.0"}
+serde_json = "1.0"
+serde = {version = "1.0.145",features = ["derive"]}
+tracing = "0.1"
diff --git a/registry-zookeeper/LICENSE b/registry-zookeeper/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/registry-zookeeper/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/registry-zookeeper/src/lib.rs b/registry-zookeeper/src/lib.rs
new file mode 100644
index 0000000..7481129
--- /dev/null
+++ b/registry-zookeeper/src/lib.rs
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+pub mod zookeeper_registry;
+
+#[cfg(test)]
+mod tests {
+    #[test]
+    fn it_works() {
+        let result = 2 + 2;
+        assert_eq!(result, 4);
+    }
+}
diff --git a/registry-zookeeper/src/zookeeper_registry.rs 
b/registry-zookeeper/src/zookeeper_registry.rs
new file mode 100644
index 0000000..6c2ef91
--- /dev/null
+++ b/registry-zookeeper/src/zookeeper_registry.rs
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#![allow(unused_variables, dead_code, missing_docs)]
+
+use dubbo::common::url::Url;
+use dubbo::registry::memory_registry::MemoryNotifyListener;
+use dubbo::registry::NotifyListener;
+use dubbo::registry::Registry;
+use dubbo::registry::ServiceEvent;
+use dubbo::StdError;
+use serde::{Deserialize, Serialize};
+use tracing::info;
+use zookeeper::Acl;
+use zookeeper::CreateMode;
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::sync::RwLock;
+use std::time::Duration;
+use zookeeper::{WatchedEvent, WatchedEventType, Watcher, ZooKeeper};
+
+// 从url中获取服务注册的元数据
+/// rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s)
+/// dubboPath = fmt.Sprintf("/%s/%s/%s", 
r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), 
common.DubboNodes[common.PROVIDER])
+
+pub const REGISTRY_GROUP_KEY: &str = "registry.group";
+
+struct LoggingWatcher;
+impl Watcher for LoggingWatcher {
+    fn handle(&self, e: WatchedEvent) {
+        println!("{:?}", e)
+    }
+}
+
+//#[derive(Debug)]
+pub struct ZookeeperRegistry {
+    root_path: String,
+    zk_client: Arc<ZooKeeper>,
+
+    listeners: RwLock<HashMap<String, Arc<<ZookeeperRegistry as 
Registry>::NotifyListener>>>,
+}
+
+pub struct MyNotifyListener {}
+
+impl NotifyListener for MyNotifyListener {
+    fn notify(&self, event: dubbo::registry::ServiceEvent) {
+        todo!()
+    }
+
+    fn notify_all(&self, event: dubbo::registry::ServiceEvent) {
+        todo!()
+    }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct ZkServiceInstance {
+    name: String,
+    address: String,
+    port: i32,
+}
+
+impl ZkServiceInstance {
+    pub fn get_service_name(&self) -> &str {
+        self.name.as_str()
+    }
+
+    pub fn get_host(&self) -> &str {
+        self.address.as_str()
+    }
+
+    pub fn get_port(&self) -> i32 {
+        self.port
+    }
+}
+
+impl ZookeeperRegistry {
+    pub fn new(connect_string: &str) -> ZookeeperRegistry {
+        let zk_client =
+            ZooKeeper::connect(connect_string, Duration::from_secs(15), 
LoggingWatcher).unwrap();
+        ZookeeperRegistry {
+            root_path: "/services".to_string(),
+            zk_client: Arc::new(zk_client),
+
+            listeners: RwLock::new(HashMap::new()),
+        }
+    }
+
+    fn create_listener(
+        &self,
+        path: String,
+        service_name: String,
+        listener: Arc<<ZookeeperRegistry as Registry>::NotifyListener>,
+    ) -> ServiceInstancesChangedListener {
+        let mut service_names = HashSet::new();
+        service_names.insert(service_name.clone());
+        return ServiceInstancesChangedListener {
+            zk_client: Arc::clone(&self.zk_client),
+            path: path,
+
+            service_name: service_name.clone(),
+            listener: listener,
+        };
+    }
+
+    fn get_app_name(&self, service_name: String) -> String {
+        let res = self
+            .zk_client
+            .get_data(&("/dubbo/mapping/".to_owned() + &service_name), false);
+
+        let x = res.unwrap().0;
+        let s = match std::str::from_utf8(&x) {
+            Ok(v) => v,
+            Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
+        };
+        s.to_string()
+    }
+}
+
+impl Registry for ZookeeperRegistry {
+    type NotifyListener = MemoryNotifyListener;
+
+    fn register(&mut self, url: Url) -> Result<(), StdError> {
+        todo!();
+    }
+
+    fn unregister(&mut self, url: Url) -> Result<(), StdError> {
+        todo!();
+    }
+
+    fn subscribe(&self, url: Url, listener: Self::NotifyListener) -> 
Result<(), StdError> {
+        let binding = url.get_service_name();
+        let service_name = binding.get(0).unwrap();
+        let app_name = self.get_app_name(service_name.clone());
+        let path = self.root_path.clone() + "/" + &app_name;
+        if self.listeners.read().unwrap().get(service_name).is_some() {
+            return Ok(());
+        }
+
+        let arc_listener = Arc::new(listener);
+        self.listeners
+            .write()
+            .unwrap()
+            .insert(service_name.to_string(), Arc::clone(&arc_listener));
+
+        let zk_listener = self.create_listener(
+            path.clone(),
+            service_name.to_string(),
+            Arc::clone(&arc_listener),
+        );
+
+        let res = self.zk_client.get_children_w(&path, zk_listener);
+        let result: Vec<Url> = res
+            .unwrap()
+            .iter()
+            .map(|node_key| {
+                let zk_res = self.zk_client.get_data(
+                    &(self.root_path.clone() + "/" + &app_name + "/" + 
&node_key),
+                    false,
+                );
+                let vec_u8 = zk_res.unwrap().0;
+                let sstr = std::str::from_utf8(&vec_u8).unwrap();
+                let instance: ZkServiceInstance = 
serde_json::from_str(sstr).unwrap();
+                let url = Url::from_url(&format!(
+                    "triple://{}:{}/{}",
+                    instance.get_host(),
+                    instance.get_port(),
+                    service_name
+                ))
+                .unwrap();
+                url
+            })
+            .collect();
+
+        info!("notifing {}->{:?}", service_name, result);
+        arc_listener.notify(ServiceEvent {
+            key: service_name.to_string(),
+            action: String::from("ADD"),
+            service: result,
+        });
+        Ok(())
+    }
+
+    fn unsubscribe(&self, url: Url, listener: Self::NotifyListener) -> 
Result<(), StdError> {
+        todo!()
+    }
+}
+
+pub struct ServiceInstancesChangedListener {
+    zk_client: Arc<ZooKeeper>,
+    path: String,
+
+    service_name: String,
+    listener: Arc<MemoryNotifyListener>,
+}
+
+impl Watcher for ServiceInstancesChangedListener {
+    fn handle(&self, event: WatchedEvent) {
+        if let (WatchedEventType::NodeChildrenChanged, Some(path)) = 
(event.event_type, event.path)
+        {
+            let event_path = path.clone();
+            let dirs = self
+                .zk_client
+                .get_children(&event_path.clone(), false)
+                .expect("msg");
+            let result: Vec<Url> = dirs
+                .iter()
+                .map(|node_key| {
+                    let zk_res = self
+                        .zk_client
+                        .get_data(&(event_path.clone() + "/" + node_key), 
false);
+                    let vec_u8 = zk_res.unwrap().0;
+                    let sstr = std::str::from_utf8(&vec_u8).unwrap();
+                    let instance: ZkServiceInstance = 
serde_json::from_str(sstr).unwrap();
+                    let url = Url::from_url(&format!(
+                        "triple://{}:{}/{}",
+                        instance.get_host(),
+                        instance.get_port(),
+                        self.service_name
+                    ))
+                    .unwrap();
+                    url
+                })
+                .collect();
+
+            let res = self.zk_client.get_children_w(
+                &path,
+                ServiceInstancesChangedListener {
+                    zk_client: Arc::clone(&self.zk_client),
+                    path: path.clone(),
+
+                    service_name: self.service_name.clone(),
+                    listener: Arc::clone(&self.listener),
+                },
+            );
+
+            info!("notifing {}->{:?}", self.service_name, result);
+            self.listener.notify(ServiceEvent {
+                key: self.service_name.clone(),
+                action: String::from("ADD"),
+                service: result,
+            });
+        }
+    }
+}
+
+impl NotifyListener for ServiceInstancesChangedListener {
+    fn notify(&self, event: ServiceEvent) {
+        todo!()
+    }
+
+    fn notify_all(&self, event: ServiceEvent) {
+        todo!()
+    }
+}
+
+#[test]
+fn it_works() {
+    let connect_string = &"mse-21b397d4-p.zk.mse.aliyuncs.com:2181";
+    let zk_client =
+        ZooKeeper::connect(connect_string, Duration::from_secs(15), 
LoggingWatcher).unwrap();
+    let watcher = Arc::new(Some(TestZkWatcher {
+        watcher: Arc::new(None),
+    }));
+    watcher.as_ref().expect("").watcher = Arc::clone(&watcher);
+    let x = watcher.as_ref().expect("");
+    zk_client.get_children_w("/test", x);
+    zk_client.delete("/test/a", None);
+    zk_client.delete("/test/b", None);
+    let zk_res = zk_client.create(
+        "/test/a",
+        vec![1, 3],
+        Acl::open_unsafe().clone(),
+        CreateMode::Ephemeral,
+    );
+    let zk_res = zk_client.create(
+        "/test/b",
+        vec![1, 3],
+        Acl::open_unsafe().clone(),
+        CreateMode::Ephemeral,
+    );
+    zk_client.close();
+}
+
+struct TestZkWatcher {
+    pub watcher: Arc<Option<TestZkWatcher>>,
+}
+
+impl Watcher for TestZkWatcher {
+    fn handle(&self, event: WatchedEvent) {
+        println!("event: {:?}", event);
+    }
+}

Reply via email to