luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775779625



##########
File path: 
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener 
{
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, 
UnhandledErrorListener {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, 
newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) 
throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         
client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {

Review comment:
       ZookeeperMetaWatcher implements MetaWatcher.
   ZookeeperWatcherUtils is a basic util class.
   
   chantccc yesterday
   ZookeeperWatcher is a basic util class wrapping zk related operations. It 
should not implement MetaWatcher.
   Use another class ZookeeperMetaWatcher which implements MetaWatcher instead.

##########
File path: 
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener 
{
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, 
UnhandledErrorListener {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, 
newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) 
throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         
client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {
+            final String dataFlowsWatchingPath = 
getWatchingPathOfDataFlowsInCluster(
+                    config.getString(Constants.CLUSTER_ID));
+            DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener 
= new DataFlowsChildrenWatcherListener(
+                    this.client, metaListener);
+            this.registerPathChildrenWatcher(
+                    dataFlowsWatchingPath, dataFlowsChildrenWatcherListener, 
true);
+            final List<ChildData> childData = 
this.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
+            if (childData != null) {
+                dataFlowsChildrenWatcherListener.onInitialized(childData);
+            }
+        }
+    }
+
+    /**
+     * If you want to change the path rule here, please change 
ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api
+     * too.
+     */
+    public static String getWatchingPathOfDataFlowsInCluster(String cluster) {

Review comment:
       ZookeeperMetaWatcher implements MetaWatcher.
   ZookeeperWatcherUtils is a basic util class.
   
   chantccc yesterday
   ZookeeperWatcher is a basic util class wrapping zk related operations. It 
should not implement MetaWatcher.
   Use another class ZookeeperMetaWatcher which implements MetaWatcher instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to