zjffdu commented on a change in pull request #3397: [ZEPPELIN-4208] Cluster synchronize InterpreterSetting URL: https://github.com/apache/zeppelin/pull/3397#discussion_r300565141
########## File path: zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ########## @@ -978,4 +1041,85 @@ public void onParagraphUpdate(Paragraph p) throws IOException { public void onParagraphStatusChange(Paragraph p, Job.Status status) throws IOException { } + + @Override + public void onClusterEvent(String msg) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("onClusterEvent : {}", msg); + } + + try { + Gson gson = new Gson(); + ClusterMessage message = ClusterMessage.deserializeMessage(msg); + String id = message.get("id"); + String name = message.get("name"); + String group = message.get("group"); + InterpreterOption option = null; + Map<String, InterpreterProperty> properties = null; + List<Dependency> dependencies = null; + String jsonOption = message.get("option"); + if (!StringUtils.isBlank(jsonOption)) { + option = InterpreterOption.fromJson(jsonOption); + } + String jsonProperties = message.get("properties"); + if (!StringUtils.isBlank(jsonProperties)) { + properties = gson.fromJson(jsonProperties, + new TypeToken<Map<String, InterpreterProperty>>() {}.getType()); + } + String jsonDependencies = message.get("dependencies"); + if (!StringUtils.isBlank(jsonOption)) { + dependencies = gson.fromJson(jsonDependencies, new TypeToken<List<Dependency>>() {}.getType()); + } + + switch (message.clusterEvent) { + case CREATE_INTP_SETTING: + inlineCreateNewSetting(name, group, dependencies, option, properties); + break; + case UPDATE_INTP_SETTING: + inlineSetPropertyAndRestart(id, option, properties, dependencies, false); + break; + case DELETE_INTP_SETTING: + inlineRemove(id, false); + break; + default: + LOGGER.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg); + break; + } + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } catch (InterpreterException e) { + LOGGER.error(e.getMessage(), e); + } + } + + // broadcast cluster event + private void broadcastClusterEvent(ClusterEvent event, InterpreterSetting intpSetting) { + if (!conf.isClusterMode()) { + return; + } + + List<Dependency> dependencies = intpSetting.getDependencies(); + Map<String, InterpreterProperty> properties + = (Map<String, InterpreterProperty>) intpSetting.getProperties(); + InterpreterOption intpOption = intpSetting.getOption(); + + HashMap<String, String> params = new HashMap<>(); + String jsonDep = gson.toJson(dependencies, new TypeToken<List<Dependency>>() { + }.getType()); + String jsonProps = gson.toJson(properties, new TypeToken<Map<String, InterpreterProperty>>() { + }.getType()); + params.put("id", intpSetting.getId()); Review comment: How about create toJson in InterpreterSetting.java, so that you don't to create toJson in Dependency and InterpreterOption ? I believe we may need to serialization interpreter setting elsewhere, although now we do it via InterpreterSettingInfoSaving, but this is due to legacy reason. We should unify the serialization of InterpreterSetting. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services