zjffdu commented on a change in pull request #3397: [ZEPPELIN-4208] Cluster 
synchronize InterpreterSetting
URL: https://github.com/apache/zeppelin/pull/3397#discussion_r300565141
 
 

 ##########
 File path: 
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
 ##########
 @@ -978,4 +1041,85 @@ public void onParagraphUpdate(Paragraph p) throws 
IOException {
   public void onParagraphStatusChange(Paragraph p, Job.Status status) throws 
IOException {
 
   }
+
+  @Override
+  public void onClusterEvent(String msg) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("onClusterEvent : {}", msg);
+    }
+
+    try {
+      Gson gson = new Gson();
+      ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+      String id = message.get("id");
+      String name = message.get("name");
+      String group = message.get("group");
+      InterpreterOption option = null;
+      Map<String, InterpreterProperty> properties = null;
+      List<Dependency> dependencies = null;
+      String jsonOption = message.get("option");
+      if (!StringUtils.isBlank(jsonOption)) {
+        option = InterpreterOption.fromJson(jsonOption);
+      }
+      String jsonProperties = message.get("properties");
+      if (!StringUtils.isBlank(jsonProperties)) {
+        properties = gson.fromJson(jsonProperties,
+            new TypeToken<Map<String, InterpreterProperty>>() {}.getType());
+      }
+      String jsonDependencies = message.get("dependencies");
+      if (!StringUtils.isBlank(jsonOption)) {
+        dependencies = gson.fromJson(jsonDependencies, new 
TypeToken<List<Dependency>>() {}.getType());
+      }
+
+      switch (message.clusterEvent) {
+        case CREATE_INTP_SETTING:
+          inlineCreateNewSetting(name, group, dependencies, option, 
properties);
+          break;
+        case UPDATE_INTP_SETTING:
+          inlineSetPropertyAndRestart(id, option, properties, dependencies, 
false);
+          break;
+        case DELETE_INTP_SETTING:
+          inlineRemove(id, false);
+          break;
+        default:
+          LOGGER.error("Unknown clusterEvent:{}, msg:{} ", 
message.clusterEvent, msg);
+          break;
+      }
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
+    } catch (InterpreterException e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+  }
+
+  // broadcast cluster event
+  private void broadcastClusterEvent(ClusterEvent event, InterpreterSetting 
intpSetting) {
+    if (!conf.isClusterMode()) {
+      return;
+    }
+
+    List<Dependency> dependencies = intpSetting.getDependencies();
+    Map<String, InterpreterProperty> properties
+        = (Map<String, InterpreterProperty>) intpSetting.getProperties();
+    InterpreterOption intpOption = intpSetting.getOption();
+
+    HashMap<String, String> params = new HashMap<>();
+    String jsonDep = gson.toJson(dependencies, new 
TypeToken<List<Dependency>>() {
+    }.getType());
+    String jsonProps = gson.toJson(properties, new TypeToken<Map<String, 
InterpreterProperty>>() {
+    }.getType());
+    params.put("id", intpSetting.getId());
 
 Review comment:
   How about create toJson in InterpreterSetting.java, so that you don't to 
create toJson in Dependency and InterpreterOption ? I believe we may need to 
serialization interpreter setting elsewhere, although now we do it via 
InterpreterSettingInfoSaving, but this is due to legacy reason. We should unify 
the serialization of InterpreterSetting. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to