luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775779945



##########
File path: 
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void 
registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements 
ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);

Review comment:
       add LOCK operation.

##########
File path: 
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void 
registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements 
ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.info("Try to add dataFlow {}", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow 
added", e);
+                }
             }
-
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = 
objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = 
getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.info("Try to add dataFlow {}", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener 
data flow added", e);
-                        }
-                    }
-                } else {
-                    LOG.warn("DataFlow {} should not be exist", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener 
data flow updated", e);
-                        }
-                    }
+        } else {
+            LOG.warn("DataFlow {} should not be exist", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow 
updated", e);
                 }
             }
         }
 
-        @Override
-        public void onChildUpdated(ChildData childData) throws Exception {
-            LOG.info("DataFlow Updated event retrieved");
+    }
 
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
+    /**
+     * updateDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = 
objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = 
getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.warn("DataFlow {} should already be exist.", 
dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener 
data flow updated", e);
-                        }
-                    }
-                } else {
-                    if (dataFlowInfo.equals(oldDataFlowInfo)) {
-                        LOG.info("DataFlowInfo has not been changed, ignore 
update.");
-                        return;
-                    }
-
-                    LOG.info("Try to update dataFlow {}.", dataFlowId);
-
-                    for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener 
data flow updated", e);
-                        }
-                    }
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);

Review comment:
       add LOCK operation.

##########
File path: 
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void 
registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements 
ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.info("Try to add dataFlow {}", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow 
added", e);
+                }
             }
-
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = 
objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = 
getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.info("Try to add dataFlow {}", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener 
data flow added", e);
-                        }
-                    }
-                } else {
-                    LOG.warn("DataFlow {} should not be exist", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener 
data flow updated", e);
-                        }
-                    }
+        } else {
+            LOG.warn("DataFlow {} should not be exist", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow 
updated", e);
                 }
             }
         }
 
-        @Override
-        public void onChildUpdated(ChildData childData) throws Exception {
-            LOG.info("DataFlow Updated event retrieved");
+    }
 
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
+    /**
+     * updateDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = 
objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = 
getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.warn("DataFlow {} should already be exist.", 
dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener 
data flow updated", e);
-                        }
-                    }
-                } else {
-                    if (dataFlowInfo.equals(oldDataFlowInfo)) {
-                        LOG.info("DataFlowInfo has not been changed, ignore 
update.");
-                        return;
-                    }
-
-                    LOG.info("Try to update dataFlow {}.", dataFlowId);
-
-                    for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener 
data flow updated", e);
-                        }
-                    }
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, 
dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.warn("DataFlow {} should already be exist.", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow 
updated", e);
                 }
             }
-        }
-
-        @Override
-        public void onChildRemoved(ChildData childData) throws Exception {
-            LOG.info("DataFlow Removed event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
+        } else {
+            if (dataFlowInfo.equals(oldDataFlowInfo)) {
+                LOG.info("DataFlowInfo has not been changed, ignore update.");
                 return;
             }
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = 
objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = 
getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                dataFlowInfoMap.remove(dataFlowId);
-                for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
-                    try {
-                        dataFlowInfoListener.removeDataFlow(dataFlowInfo);
-                    } catch (Exception e) {
-                        LOG.warn("Error happens when notifying listener data 
flow deleted", e);
-                    }
+            LOG.info("Try to update dataFlow {}.", dataFlowId);
+
+            for (DataFlowInfoListener dataFlowInfoListener : 
dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow 
updated", e);
                 }
             }
         }
 
-        @Override
-        public void onInitialized(List<ChildData> childData) throws Exception {
-            LOG.info("Initialized event retrieved");
+    }
 
-            for (ChildData singleChildData : childData) {
-                onChildAdded(singleChildData);
+    /**
+     * removeDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+
+        dataFlowInfoMap.remove(dataFlowId);

Review comment:
       add LOCK operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to