luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775779945
##########
File path:
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void
registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
LOG.info("Register DataFlowInfoListener successfully");
}
+ /**
+ * getDataFlowInfo
+ *
+ * @param id
+ * @return
+ */
public DataFlowInfo getDataFlowInfo(long id) {
- synchronized (lock) {
+ synchronized (LOCK) {
return dataFlowInfoMap.get(id);
}
}
- public interface DataFlowInfoListener {
- void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
- }
-
- private class DataFlowsChildrenWatcherListener implements
ChildrenWatcherListener {
-
- @Override
- public void onChildAdded(ChildData childData) throws Exception {
- LOG.info("DataFlow Added event retrieved");
-
- final byte[] data = childData.getData();
- if (data == null) {
- return;
+ /**
+ * addDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ long dataFlowId = dataFlowInfo.getId();
+ DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
Review comment:
add LOCK operation.
##########
File path:
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void
registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
LOG.info("Register DataFlowInfoListener successfully");
}
+ /**
+ * getDataFlowInfo
+ *
+ * @param id
+ * @return
+ */
public DataFlowInfo getDataFlowInfo(long id) {
- synchronized (lock) {
+ synchronized (LOCK) {
return dataFlowInfoMap.get(id);
}
}
- public interface DataFlowInfoListener {
- void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
- }
-
- private class DataFlowsChildrenWatcherListener implements
ChildrenWatcherListener {
-
- @Override
- public void onChildAdded(ChildData childData) throws Exception {
- LOG.info("DataFlow Added event retrieved");
-
- final byte[] data = childData.getData();
- if (data == null) {
- return;
+ /**
+ * addDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ long dataFlowId = dataFlowInfo.getId();
+ DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
+ if (oldDataFlowInfo == null) {
+ LOG.info("Try to add dataFlow {}", dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.addDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow
added", e);
+ }
}
-
- synchronized (lock) {
- DataFlowStorageInfo dataFlowStorageInfo =
objectMapper.readValue(data, DataFlowStorageInfo.class);
- DataFlowInfo dataFlowInfo =
getDataFlowInfo(dataFlowStorageInfo);
- long dataFlowId = dataFlowInfo.getId();
-
- DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
- if (oldDataFlowInfo == null) {
- LOG.info("Try to add dataFlow {}", dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.addDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener
data flow added", e);
- }
- }
- } else {
- LOG.warn("DataFlow {} should not be exist", dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.updateDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener
data flow updated", e);
- }
- }
+ } else {
+ LOG.warn("DataFlow {} should not be exist", dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow
updated", e);
}
}
}
- @Override
- public void onChildUpdated(ChildData childData) throws Exception {
- LOG.info("DataFlow Updated event retrieved");
+ }
- final byte[] data = childData.getData();
- if (data == null) {
- return;
- }
+ /**
+ * updateDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ long dataFlowId = dataFlowInfo.getId();
- synchronized (lock) {
- DataFlowStorageInfo dataFlowStorageInfo =
objectMapper.readValue(data, DataFlowStorageInfo.class);
- DataFlowInfo dataFlowInfo =
getDataFlowInfo(dataFlowStorageInfo);
- long dataFlowId = dataFlowInfo.getId();
-
- DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
- if (oldDataFlowInfo == null) {
- LOG.warn("DataFlow {} should already be exist.",
dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.addDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener
data flow updated", e);
- }
- }
- } else {
- if (dataFlowInfo.equals(oldDataFlowInfo)) {
- LOG.info("DataFlowInfo has not been changed, ignore
update.");
- return;
- }
-
- LOG.info("Try to update dataFlow {}.", dataFlowId);
-
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.updateDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener
data flow updated", e);
- }
- }
+ DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
Review comment:
add LOCK operation.
##########
File path:
inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void
registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
LOG.info("Register DataFlowInfoListener successfully");
}
+ /**
+ * getDataFlowInfo
+ *
+ * @param id
+ * @return
+ */
public DataFlowInfo getDataFlowInfo(long id) {
- synchronized (lock) {
+ synchronized (LOCK) {
return dataFlowInfoMap.get(id);
}
}
- public interface DataFlowInfoListener {
- void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
- }
-
- private class DataFlowsChildrenWatcherListener implements
ChildrenWatcherListener {
-
- @Override
- public void onChildAdded(ChildData childData) throws Exception {
- LOG.info("DataFlow Added event retrieved");
-
- final byte[] data = childData.getData();
- if (data == null) {
- return;
+ /**
+ * addDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ long dataFlowId = dataFlowInfo.getId();
+ DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
+ if (oldDataFlowInfo == null) {
+ LOG.info("Try to add dataFlow {}", dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.addDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow
added", e);
+ }
}
-
- synchronized (lock) {
- DataFlowStorageInfo dataFlowStorageInfo =
objectMapper.readValue(data, DataFlowStorageInfo.class);
- DataFlowInfo dataFlowInfo =
getDataFlowInfo(dataFlowStorageInfo);
- long dataFlowId = dataFlowInfo.getId();
-
- DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
- if (oldDataFlowInfo == null) {
- LOG.info("Try to add dataFlow {}", dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.addDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener
data flow added", e);
- }
- }
- } else {
- LOG.warn("DataFlow {} should not be exist", dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.updateDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener
data flow updated", e);
- }
- }
+ } else {
+ LOG.warn("DataFlow {} should not be exist", dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow
updated", e);
}
}
}
- @Override
- public void onChildUpdated(ChildData childData) throws Exception {
- LOG.info("DataFlow Updated event retrieved");
+ }
- final byte[] data = childData.getData();
- if (data == null) {
- return;
- }
+ /**
+ * updateDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ long dataFlowId = dataFlowInfo.getId();
- synchronized (lock) {
- DataFlowStorageInfo dataFlowStorageInfo =
objectMapper.readValue(data, DataFlowStorageInfo.class);
- DataFlowInfo dataFlowInfo =
getDataFlowInfo(dataFlowStorageInfo);
- long dataFlowId = dataFlowInfo.getId();
-
- DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
- if (oldDataFlowInfo == null) {
- LOG.warn("DataFlow {} should already be exist.",
dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.addDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener
data flow updated", e);
- }
- }
- } else {
- if (dataFlowInfo.equals(oldDataFlowInfo)) {
- LOG.info("DataFlowInfo has not been changed, ignore
update.");
- return;
- }
-
- LOG.info("Try to update dataFlow {}.", dataFlowId);
-
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.updateDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener
data flow updated", e);
- }
- }
+ DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId,
dataFlowInfo);
+ if (oldDataFlowInfo == null) {
+ LOG.warn("DataFlow {} should already be exist.", dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.addDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow
updated", e);
}
}
- }
-
- @Override
- public void onChildRemoved(ChildData childData) throws Exception {
- LOG.info("DataFlow Removed event retrieved");
-
- final byte[] data = childData.getData();
- if (data == null) {
+ } else {
+ if (dataFlowInfo.equals(oldDataFlowInfo)) {
+ LOG.info("DataFlowInfo has not been changed, ignore update.");
return;
}
- synchronized (lock) {
- DataFlowStorageInfo dataFlowStorageInfo =
objectMapper.readValue(data, DataFlowStorageInfo.class);
- DataFlowInfo dataFlowInfo =
getDataFlowInfo(dataFlowStorageInfo);
- long dataFlowId = dataFlowInfo.getId();
-
- dataFlowInfoMap.remove(dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.removeDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener data
flow deleted", e);
- }
+ LOG.info("Try to update dataFlow {}.", dataFlowId);
+
+ for (DataFlowInfoListener dataFlowInfoListener :
dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow
updated", e);
}
}
}
- @Override
- public void onInitialized(List<ChildData> childData) throws Exception {
- LOG.info("Initialized event retrieved");
+ }
- for (ChildData singleChildData : childData) {
- onChildAdded(singleChildData);
+ /**
+ * removeDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ long dataFlowId = dataFlowInfo.getId();
+
+ dataFlowInfoMap.remove(dataFlowId);
Review comment:
add LOCK operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]