http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 78ff078..a376623 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -54,9 +54,13 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; import org.apache.zeppelin.notebook.socket.WatcherMessage; +import org.apache.zeppelin.rest.exception.ForbiddenException; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; +import org.apache.zeppelin.service.NotebookService; +import org.apache.zeppelin.service.ServiceContext; +import org.apache.zeppelin.service.SimpleServiceCallback; import org.apache.zeppelin.ticket.TicketContainer; import org.apache.zeppelin.types.InterpreterSettingsList; import org.apache.zeppelin.user.AuthenticationInfo; @@ -75,6 +79,7 @@ import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import java.io.IOException; +import java.lang.reflect.Type; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.text.ParseException; @@ -102,11 +107,11 @@ import java.util.regex.Pattern; */ public class NotebookServer extends WebSocketServlet implements NotebookSocketListener, - JobListenerFactory, - AngularObjectRegistryListener, - RemoteInterpreterProcessListener, - ApplicationEventListener, - NotebookServerMBean { + JobListenerFactory, + AngularObjectRegistryListener, + RemoteInterpreterProcessListener, + ApplicationEventListener, + NotebookServerMBean { /** * Job manager service type. @@ -127,8 +132,8 @@ public class NotebookServer extends WebSocketServlet private HashSet<String> collaborativeModeList = new HashSet<>(); private Boolean collaborativeModeEnable = ZeppelinConfiguration - .create() - .isZeppelinNotebookCollaborativeModeEnable(); + .create() + .isZeppelinNotebookCollaborativeModeEnable(); private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); private static Gson gson = new GsonBuilder() .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") @@ -140,6 +145,8 @@ public class NotebookServer extends WebSocketServlet final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>(); final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>(); + private NotebookService notebookService; + private ExecutorService executorService = Executors.newFixedThreadPool(10); /** @@ -154,6 +161,13 @@ public class NotebookServer extends WebSocketServlet return ZeppelinServer.notebook; } + private synchronized NotebookService getNotebookService() { + if (this.notebookService == null) { + this.notebookService = new NotebookService(notebook()); + } + return this.notebookService; + } + @Override public void configure(WebSocketServletFactory factory) { factory.setCreator(new NotebookWebSocketCreator(this)); @@ -174,8 +188,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onOpen(NotebookSocket conn) { - LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), - conn.getRequest().getRemotePort()); + LOG.info("New connection from {}", conn); connectedSockets.add(conn); } @@ -184,12 +197,13 @@ public class NotebookServer extends WebSocketServlet Notebook notebook = notebook(); try { Message messagereceived = deserializeMessage(msg); - LOG.debug("RECEIVE << " + messagereceived.op + - ", RECEIVE PRINCIPAL << " + messagereceived.principal + - ", RECEIVE TICKET << " + messagereceived.ticket + - ", RECEIVE ROLES << " + messagereceived.roles + - ", RECEIVE DATA << " + messagereceived.data); - + if (messagereceived.op != OP.PING) { + LOG.debug("RECEIVE: " + messagereceived.op + + ", RECEIVE PRINCIPAL: " + messagereceived.principal + + ", RECEIVE TICKET: " + messagereceived.ticket + + ", RECEIVE ROLES: " + messagereceived.roles + + ", RECEIVE DATA: " + messagereceived.data); + } if (LOG.isTraceEnabled()) { LOG.trace("RECEIVE MSG = " + messagereceived); } @@ -237,22 +251,22 @@ public class NotebookServer extends WebSocketServlet // Lets be elegant here switch (messagereceived.op) { case LIST_NOTES: - unicastNoteList(conn, subject, userAndRoles); + listNotes(conn, messagereceived); break; case RELOAD_NOTES_FROM_REPO: broadcastReloadedNoteList(subject, userAndRoles); break; case GET_HOME_NOTE: - sendHomeNote(conn, userAndRoles, notebook, messagereceived); + getHomeNote(conn, messagereceived); break; case GET_NOTE: - sendNote(conn, userAndRoles, notebook, messagereceived); + getNote(conn, messagereceived); break; case NEW_NOTE: - createNote(conn, userAndRoles, notebook, messagereceived); + createNote(conn, messagereceived); break; case DEL_NOTE: - removeNote(conn, userAndRoles, notebook, messagereceived); + deleteNote(conn, messagereceived); break; case REMOVE_FOLDER: removeFolder(conn, userAndRoles, notebook, messagereceived); @@ -270,55 +284,55 @@ public class NotebookServer extends WebSocketServlet restoreFolder(conn, userAndRoles, notebook, messagereceived); break; case RESTORE_NOTE: - restoreNote(conn, userAndRoles, notebook, messagereceived); + restoreNote(conn, messagereceived); break; case RESTORE_ALL: restoreAll(conn, userAndRoles, notebook, messagereceived); break; case CLONE_NOTE: - cloneNote(conn, userAndRoles, notebook, messagereceived); + cloneNote(conn, messagereceived); break; case IMPORT_NOTE: - importNote(conn, userAndRoles, notebook, messagereceived); + importNote(conn, messagereceived); break; case COMMIT_PARAGRAPH: - updateParagraph(conn, userAndRoles, notebook, messagereceived); + updateParagraph(conn, messagereceived); break; case RUN_PARAGRAPH: - runParagraph(conn, userAndRoles, notebook, messagereceived); + runParagraph(conn, messagereceived); break; case PARAGRAPH_EXECUTED_BY_SPELL: broadcastSpellExecution(conn, userAndRoles, notebook, messagereceived); break; case RUN_ALL_PARAGRAPHS: - runAllParagraphs(conn, userAndRoles, notebook, messagereceived); + runAllParagraphs(conn, messagereceived); break; case CANCEL_PARAGRAPH: - cancelParagraph(conn, userAndRoles, notebook, messagereceived); + cancelParagraph(conn, messagereceived); break; case MOVE_PARAGRAPH: - moveParagraph(conn, userAndRoles, notebook, messagereceived); + moveParagraph(conn, messagereceived); break; case INSERT_PARAGRAPH: - insertParagraph(conn, userAndRoles, notebook, messagereceived); + insertParagraph(conn, messagereceived); break; case COPY_PARAGRAPH: - copyParagraph(conn, userAndRoles, notebook, messagereceived); + copyParagraph(conn, messagereceived); break; case PARAGRAPH_REMOVE: - removeParagraph(conn, userAndRoles, notebook, messagereceived); + removeParagraph(conn, messagereceived); break; case PARAGRAPH_CLEAR_OUTPUT: - clearParagraphOutput(conn, userAndRoles, notebook, messagereceived); + clearParagraphOutput(conn, messagereceived); break; case PARAGRAPH_CLEAR_ALL_OUTPUT: - clearAllParagraphOutput(conn, userAndRoles, notebook, messagereceived); + clearAllParagraphOutput(conn, messagereceived); break; case NOTE_UPDATE: - updateNote(conn, userAndRoles, notebook, messagereceived); + updateNote(conn, messagereceived); break; case NOTE_RENAME: - renameNote(conn, userAndRoles, notebook, messagereceived); + renameNote(conn, messagereceived); break; case FOLDER_RENAME: renameFolder(conn, userAndRoles, notebook, messagereceived); @@ -327,7 +341,7 @@ public class NotebookServer extends WebSocketServlet updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived); break; case COMPLETION: - completion(conn, userAndRoles, notebook, messagereceived); + completion(conn, messagereceived); break; case PING: break; //do nothing @@ -344,19 +358,19 @@ public class NotebookServer extends WebSocketServlet sendAllConfigurations(conn, userAndRoles, notebook); break; case CHECKPOINT_NOTE: - checkpointNote(conn, notebook, messagereceived); + checkpointNote(conn, messagereceived); break; case LIST_REVISION_HISTORY: - listRevisionHistory(conn, notebook, messagereceived); + listRevisionHistory(conn, messagereceived); break; case SET_NOTE_REVISION: - setNoteRevision(conn, userAndRoles, notebook, messagereceived); + setNoteRevision(conn, messagereceived); break; case NOTE_REVISION: - getNoteByRevision(conn, notebook, messagereceived); + getNoteByRevision(conn, messagereceived); break; case NOTE_REVISION_FOR_COMPARE: - getNoteByRevisionForCompare(conn, notebook, messagereceived); + getNoteByRevisionForCompare(conn, messagereceived); break; case LIST_NOTE_JOBS: unicastNoteJobInfo(conn, messagereceived); @@ -371,16 +385,16 @@ public class NotebookServer extends WebSocketServlet getEditorSetting(conn, messagereceived); break; case GET_INTERPRETER_SETTINGS: - getInterpreterSettings(conn, subject); + getInterpreterSettings(conn); break; case WATCHER: switchConnectionToWatcher(conn, messagereceived); break; case SAVE_NOTE_FORMS: - saveNoteForms(conn, userAndRoles, notebook, messagereceived); + saveNoteForms(conn, messagereceived); break; case REMOVE_NOTE_FORMS: - removeNoteForms(conn, userAndRoles, notebook, messagereceived); + removeNoteForms(conn, messagereceived); break; case PATCH_PARAGRAPH: patchParagraph(conn, userAndRoles, notebook, messagereceived); @@ -395,14 +409,14 @@ public class NotebookServer extends WebSocketServlet @Override public void onClose(NotebookSocket conn, int code, String reason) { - LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest().getRemoteAddr(), - conn.getRequest().getRemotePort(), code, reason); + LOG.info("Closed connection to {} ({}) {}", conn, code, reason); removeConnectionFromAllNote(conn); connectedSockets.remove(conn); removeUserConnection(conn.getUser(), conn); } private void removeUserConnection(String user, NotebookSocket conn) { + LOG.debug("Remove user connection {} for user: {}", conn, user); if (userConnectedSockets.containsKey(user)) { userConnectedSockets.get(user).remove(conn); } else { @@ -411,6 +425,7 @@ public class NotebookServer extends WebSocketServlet } private void addUserConnection(String user, NotebookSocket conn) { + LOG.debug("Add user connection {} for user: {}", conn, user); conn.setUser(user); if (userConnectedSockets.containsKey(user)) { userConnectedSockets.get(user).add(conn); @@ -430,6 +445,7 @@ public class NotebookServer extends WebSocketServlet } private void addConnectionToNote(String noteId, NotebookSocket socket) { + LOG.debug("Add connection {} to note: {}", socket, noteId); synchronized (noteSocketMap) { removeConnectionFromAllNote(socket); // make sure a socket relates only a // single note. @@ -446,6 +462,7 @@ public class NotebookServer extends WebSocketServlet } private void removeConnectionFromNote(String noteId, NotebookSocket socket) { + LOG.debug("Remove connection {} from note: {}", socket, noteId); synchronized (noteSocketMap) { List<NotebookSocket> socketList = noteSocketMap.get(noteId); if (socketList != null) { @@ -455,7 +472,7 @@ public class NotebookServer extends WebSocketServlet } } - private void removeNote(String noteId) { + private void removeConn(String noteId) { synchronized (noteSocketMap) { List<NotebookSocket> socketList = noteSocketMap.remove(noteId); } @@ -485,7 +502,7 @@ public class NotebookServer extends WebSocketServlet message.put("status", collaborativeStatusNew); if (collaborativeStatusNew) { HashSet<String> userList = new HashSet<>(); - for (NotebookSocket noteSocket: socketList) { + for (NotebookSocket noteSocket : socketList) { userList.add(noteSocket.getUser()); } message.put("users", userList); @@ -626,7 +643,8 @@ public class NotebookServer extends WebSocketServlet } public List<Map<String, String>> generateNotesInfo(boolean needsReload, - AuthenticationInfo subject, Set<String> userAndRoles) { + AuthenticationInfo subject, + Set<String> userAndRoles) { Notebook notebook = notebook(); ZeppelinConfiguration conf = notebook.getConf(); @@ -690,7 +708,7 @@ public class NotebookServer extends WebSocketServlet } public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap, - Paragraph defaultParagraph) { + Paragraph defaultParagraph) { if (null != userParagraphMap) { for (String user : userParagraphMap.keySet()) { multicastToUser(user, @@ -706,7 +724,7 @@ public class NotebookServer extends WebSocketServlet new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex)); } - public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) { + public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) { if (subject == null) { subject = new AuthenticationInfo(StringUtils.EMPTY); } @@ -717,10 +735,16 @@ public class NotebookServer extends WebSocketServlet broadcastNoteListExcept(notesInfo, subject); } - public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject, - HashSet<String> userAndRoles) { - List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles); - unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); + public void listNotes(NotebookSocket conn, Message message) throws IOException { + getNotebookService().listNotes(false, getServiceContext(message), + new WebSocketServiceCallback<List<Map<String, String>>>(conn) { + @Override + public void onSuccess(List<Map<String, String>> notesInfo, + ServiceContext context) throws IOException { + super.onSuccess(notesInfo, context); + unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); + } + }); } public void broadcastReloadedNoteList(AuthenticationInfo subject, HashSet userAndRoles) { @@ -736,7 +760,7 @@ public class NotebookServer extends WebSocketServlet } private void broadcastNoteListExcept(List<Map<String, String>> notesInfo, - AuthenticationInfo subject) { + AuthenticationInfo subject) { Set<String> userAndRoles; NotebookAuthorization authInfo = NotebookAuthorization.getInstance(); for (String user : userConnectedSockets.keySet()) { @@ -752,7 +776,7 @@ public class NotebookServer extends WebSocketServlet } void permissionError(NotebookSocket conn, String op, String userName, Set<String> userAndRoles, - Set<String> allowed) throws IOException { + Set<String> allowed) throws IOException { LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed); conn.send(serializeMessage(new Message(OP.AUTH_INFO).put("info", @@ -761,32 +785,18 @@ public class NotebookServer extends WebSocketServlet .toString()))); } - /** - * @return false if user doesn't have reader permission for this paragraph - */ - private boolean hasParagraphReaderPermission(NotebookSocket conn, Notebook notebook, - String noteId, HashSet<String> userAndRoles, String principal, String op) - throws IOException { - NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); - if (!notebookAuthorization.isReader(noteId, userAndRoles)) { - permissionError(conn, op, principal, userAndRoles, - notebookAuthorization.getOwners(noteId)); - return false; - } - - return true; - } /** * @return false if user doesn't have runner permission for this paragraph */ private boolean hasParagraphRunnerPermission(NotebookSocket conn, Notebook notebook, - String noteId, HashSet<String> userAndRoles, String principal, String op) - throws IOException { + String noteId, HashSet<String> userAndRoles, + String principal, String op) + throws IOException { NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isRunner(noteId, userAndRoles)) { permissionError(conn, op, principal, userAndRoles, - notebookAuthorization.getOwners(noteId)); + notebookAuthorization.getOwners(noteId)); return false; } @@ -797,8 +807,9 @@ public class NotebookServer extends WebSocketServlet * @return false if user doesn't have writer permission for this paragraph */ private boolean hasParagraphWriterPermission(NotebookSocket conn, Notebook notebook, - String noteId, HashSet<String> userAndRoles, String principal, String op) - throws IOException { + String noteId, HashSet<String> userAndRoles, + String principal, String op) + throws IOException { NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { permissionError(conn, op, principal, userAndRoles, @@ -813,7 +824,8 @@ public class NotebookServer extends WebSocketServlet * @return false if user doesn't have owner permission for this paragraph */ private boolean hasParagraphOwnerPermission(NotebookSocket conn, Notebook notebook, String noteId, - HashSet<String> userAndRoles, String principal, String op) throws IOException { + HashSet<String> userAndRoles, String principal, + String op) throws IOException { NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isOwner(noteId, userAndRoles)) { permissionError(conn, op, principal, userAndRoles, @@ -824,66 +836,45 @@ public class NotebookServer extends WebSocketServlet return true; } - private void sendNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { - LOG.info("New operation from {} : {} : {} : {} : {}", conn.getRequest().getRemoteAddr(), - conn.getRequest().getRemotePort(), fromMessage.principal, fromMessage.op, - fromMessage.get("id")); - + private void getNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; } - - String user = fromMessage.principal; - - Note note = notebook.getNote(noteId); - - if (note != null) { - if (!hasParagraphReaderPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "read")) { - return; - } - - addConnectionToNote(note.getId(), conn); - - if (note.isPersonalizedMode()) { - note = note.getUserNote(user); - } - conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); - sendAllAngularObjects(note, user, conn); - } else { - conn.send(serializeMessage(new Message(OP.NOTE).put("note", null))); - } + getNotebookService().getNote(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + addConnectionToNote(note.getId(), conn); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); + sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); + } + }); } - private void sendHomeNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { - String noteId = notebook.getConf().getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); - String user = fromMessage.principal; - - Note note = null; - if (noteId != null) { - note = notebook.getNote(noteId); - } - - if (note != null) { - if (!hasParagraphReaderPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "read")) { - return; - } + private void getHomeNote(NotebookSocket conn, + Message fromMessage) throws IOException { - addConnectionToNote(note.getId(), conn); - conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); - sendAllAngularObjects(note, user, conn); - } else { - removeConnectionFromAllNote(conn); - conn.send(serializeMessage(new Message(OP.NOTE).put("note", null))); - } + getNotebookService().getHomeNote(getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + if (note != null) { + addConnectionToNote(note.getId(), conn); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); + sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); + } else { + removeConnectionFromAllNote(conn); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", null))); + } + } + }); } - private void updateNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void updateNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); @@ -894,35 +885,20 @@ public class NotebookServer extends WebSocketServlet return; } - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "update")) { - return; - } - - Note note = notebook.getNote(noteId); - if (note != null) { - if (!(Boolean) note.getConfig().get("isZeppelinNotebookCronEnable")) { - if (config.get("cron") != null) { - config.remove("cron"); - } - } - boolean cronUpdated = isCronUpdated(config, note.getConfig()); - note.setName(name); - note.setConfig(config); - if (cronUpdated) { - notebook.refreshCron(note.getId()); - } - - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - note.persist(subject); - broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name).put("config", config) - .put("info", note.getInfo())); - broadcastNoteList(subject, userAndRoles); - } + getNotebookService().updateNote(noteId, name, config, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name) + .put("config", config) + .put("info", note.getInfo())); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); } private void updatePersonalizedMode(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + Notebook notebook, Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); String personalized = (String) fromMessage.get("personalized"); boolean isPersonalized = personalized.equals("true") ? true : false; @@ -945,44 +921,31 @@ public class NotebookServer extends WebSocketServlet } } - private void renameNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { - renameNote(conn, userAndRoles, notebook, fromMessage, "rename"); - } - - private void renameNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage, String op) throws IOException { + private void renameNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); - if (noteId == null) { return; } - - if (!hasParagraphOwnerPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "rename")) { - return; - } - - Note note = notebook.getNote(noteId); - if (note != null) { - note.setName(name); - note.setCronSupported(notebook.getConf()); - - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - note.persist(subject); - broadcastNote(note); - broadcastNoteList(subject, userAndRoles); - } + getNotebookService().renameNote(noteId, name, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + broadcastNote(note); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); } private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { renameFolder(conn, userAndRoles, notebook, fromMessage, "rename"); } private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage, String op) throws IOException { + Message fromMessage, String op) throws IOException { String oldFolderId = (String) fromMessage.get("id"); String newFolderId = (String) fromMessage.get("name"); @@ -1013,76 +976,50 @@ public class NotebookServer extends WebSocketServlet } } - private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) { - boolean cronUpdated = false; - if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron") - .equals(configB.get("cron"))) { - cronUpdated = true; - } else if (configA.get("cron") == null && configB.get("cron") == null) { - cronUpdated = false; - } else if (configA.get("cron") != null || configB.get("cron") != null) { - cronUpdated = true; - } - - return cronUpdated; - } - - private void createNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message message) throws IOException { - AuthenticationInfo subject = new AuthenticationInfo(message.principal); + private void createNote(NotebookSocket conn, + Message message) throws IOException { - try { - Note note = null; - String defaultInterpreterSettingId = (String) message.get("defaultInterpreterGroup"); - if (defaultInterpreterSettingId != null) { - note = notebook.createNote(defaultInterpreterSettingId, subject); - } else { - note = notebook.createNote(subject); - } - - note.addNewParagraph(subject); // it's an empty note. so add one paragraph - if (message != null) { - String noteName = (String) message.get("name"); - if (StringUtils.isEmpty(noteName)) { - noteName = "Note " + note.getId(); - } - note.setName(noteName); - note.setCronSupported(notebook.getConf()); - } + String defaultInterpreterGroup = (String) message.get("defaultInterpreterGroup"); + String noteName = (String) message.get("name"); + getNotebookService().createNote(defaultInterpreterGroup, noteName, getServiceContext(message), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + addConnectionToNote(note.getId(), conn); + conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note))); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } - note.persist(subject); - addConnectionToNote(note.getId(), conn); - conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note))); - } catch (IOException e) { - LOG.error("Exception from createNote", e); - conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", - "Oops! There is something wrong with the notebook file system. " - + "Please check the logs for more details."))); - return; - } - broadcastNoteList(subject, userAndRoles); + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + super.onFailure(ex, context); + conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", + "Oops! There is something wrong with the notebook file system. " + + "Please check the logs for more details."))); + } + }); } - private void removeNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void deleteNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; } - - if (!hasParagraphOwnerPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "remove")) { - return; - } - - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - notebook.removeNote(noteId, subject); - removeNote(noteId); - broadcastNoteList(subject, userAndRoles); + getNotebookService().removeNote(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<String>(conn) { + @Override + public void onSuccess(String message, ServiceContext context) throws IOException { + super.onSuccess(message, context); + removeConn(noteId); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); } private void removeFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { String folderId = (String) fromMessage.get("id"); if (folderId == null) { return; @@ -1101,13 +1038,13 @@ public class NotebookServer extends WebSocketServlet AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); for (Note note : notes) { notebook.removeNote(note.getId(), subject); - removeNote(note.getId()); + removeConn(note.getId()); } broadcastNoteList(subject, userAndRoles); } private void moveNoteToTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws SchedulerException, IOException { + Message fromMessage) throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -1121,15 +1058,16 @@ public class NotebookServer extends WebSocketServlet notebook.removeCron(note.getId()); } - if (note != null && !note.isTrash()){ + if (note != null && !note.isTrash()) { fromMessage.put("name", Folder.TRASH_FOLDER_ID + "/" + note.getName()); - renameNote(conn, userAndRoles, notebook, fromMessage, "move"); + renameNote(conn, fromMessage); notebook.moveNoteToTrash(note.getId()); } } private void moveFolderToTrash(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws SchedulerException, IOException { + Notebook notebook, Message fromMessage) + throws SchedulerException, IOException { String folderId = (String) fromMessage.get("id"); if (folderId == null) { return; @@ -1138,14 +1076,14 @@ public class NotebookServer extends WebSocketServlet Folder folder = notebook.getFolder(folderId); if (folder != null && !folder.isTrash()) { String trashFolderId = Folder.TRASH_FOLDER_ID + "/" + folderId; - if (notebook.hasFolder(trashFolderId)){ + if (notebook.hasFolder(trashFolderId)) { DateTime currentDate = new DateTime(); DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); trashFolderId += Folder.TRASH_FOLDER_CONFLICT_INFIX + formatter.print(currentDate); } List<Note> noteList = folder.getNotesRecursively(); - for (Note note: noteList) { + for (Note note : noteList) { Map<String, Object> config = note.getConfig(); if (config.get("cron") != null) { notebook.removeCron(note.getId()); @@ -1157,30 +1095,20 @@ public class NotebookServer extends WebSocketServlet } } - private void restoreNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws SchedulerException, IOException { + private void restoreNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); - if (noteId == null) { return; } - Note note = notebook.getNote(noteId); - - //restore cron - Map<String, Object> config = note.getConfig(); - if (config.get("cron") != null) { - notebook.refreshCron(note.getId()); - } + getNotebookService().restoreNote(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn)); - if (note != null && note.isTrash()) { - fromMessage.put("name", note.getName().replaceFirst(Folder.TRASH_FOLDER_ID + "/", "")); - renameNote(conn, userAndRoles, notebook, fromMessage, "restore"); - } } private void restoreFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws SchedulerException, IOException { + Message fromMessage) throws SchedulerException, IOException { String folderId = (String) fromMessage.get("id"); if (folderId == null) { @@ -1211,7 +1139,7 @@ public class NotebookServer extends WebSocketServlet } private void restoreAll(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws SchedulerException, IOException { + Message fromMessage) throws IOException { Folder trashFolder = notebook.getFolder(Folder.TRASH_FOLDER_ID); if (trashFolder != null) { fromMessage.data = new HashMap<>(); @@ -1222,57 +1150,42 @@ public class NotebookServer extends WebSocketServlet } private void emptyTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws SchedulerException, IOException { + Message fromMessage) throws SchedulerException, IOException { fromMessage.data = new HashMap<>(); fromMessage.put("id", Folder.TRASH_FOLDER_ID); removeFolder(conn, userAndRoles, notebook, fromMessage); } - private void updateParagraph(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private void updateParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - - Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); - Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); String noteId = getOpenNoteId(conn); if (noteId == null) { noteId = (String) fromMessage.get("noteId"); } - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "write")) { - return; - } - - final Note note = notebook.getNote(noteId); - Paragraph p = note.getParagraph(paragraphId); - - p.settings.setParams(params); - p.setConfig(config); - p.setTitle((String) fromMessage.get("title")); - p.setText((String) fromMessage.get("paragraph")); - - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - if (note.isPersonalizedMode()) { - p = p.getUserParagraph(subject.getUser()); - p.settings.setParams(params); - p.setConfig(config); - p.setTitle((String) fromMessage.get("title")); - p.setText((String) fromMessage.get("paragraph")); - } - - note.persist(subject); + String title = (String) fromMessage.get("title"); + String text = (String) fromMessage.get("paragraph"); + Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); + Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); - if (note.isPersonalizedMode()) { - Map<String, Paragraph> userParagraphMap = - note.getParagraph(paragraphId).getUserParagraphMap(); - broadcastParagraphs(userParagraphMap, p); - } else { - broadcastParagraph(note, p); - } + getNotebookService().updateParagraph(noteId, paragraphId, title, text, params, config, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + if (p.getNote().isPersonalizedMode()) { + Map<String, Paragraph> userParagraphMap = + p.getNote().getParagraph(paragraphId).getUserParagraphMap(); + broadcastParagraphs(userParagraphMap, p); + } else { + broadcastParagraph(p.getNote(), p); + } + } + }); } private void patchParagraph(NotebookSocket conn, HashSet<String> userAndRoles, @@ -1327,142 +1240,140 @@ public class NotebookServer extends WebSocketServlet paragraphText = (String) dmp.patchApply(patches, paragraphText)[0]; p.setText(paragraphText); Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", patchText) - .put("paragraphId", p.getId()); + .put("paragraphId", p.getId()); broadcastExcept(note.getId(), message, conn); } - private void cloneNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void cloneNote(NotebookSocket conn, + Message fromMessage) throws IOException { + String noteId = getOpenNoteId(conn); String name = (String) fromMessage.get("name"); - Note newNote = notebook.cloneNote(noteId, name, new AuthenticationInfo(fromMessage.principal)); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - addConnectionToNote(newNote.getId(), (NotebookSocket) conn); - conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote))); - broadcastNoteList(subject, userAndRoles); + getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note newNote, ServiceContext context) throws IOException { + super.onSuccess(newNote, context); + addConnectionToNote(newNote.getId(), conn); + conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote))); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); } - private void clearAllParagraphOutput(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private void clearAllParagraphOutput(NotebookSocket conn, + Message fromMessage) throws IOException { final String noteId = (String) fromMessage.get("id"); if (StringUtils.isBlank(noteId)) { return; } + getNotebookService().clearAllParagraphOutput(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + broadcastNote(note); + } + }); + } - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "clear output")) { - return; - } + protected Note importNote(NotebookSocket conn, Message fromMessage) throws IOException { + String noteName = (String) ((Map) fromMessage.get("note")).get("name"); + String noteJson = gson.toJson(fromMessage.get("note")); + Note note = getNotebookService().importNote(noteName, noteJson, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + try { + broadcastNote(note); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } catch (NullPointerException e) { + // TODO(zjffdu) remove this try catch. This is only for test of + // NotebookServerTest#testImportNotebook + } + } + }); - Note note = notebook.getNote(noteId); - note.clearAllParagraphOutput(); - broadcastNote(note); - } - - protected Note importNote(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { - Note note = null; - if (fromMessage != null) { - String noteName = (String) ((Map) fromMessage.get("note")).get("name"); - String noteJson = gson.toJson(fromMessage.get("note")); - AuthenticationInfo subject; - if (fromMessage.principal != null) { - subject = new AuthenticationInfo(fromMessage.principal); - } else { - subject = new AuthenticationInfo("anonymous"); - } - note = notebook.importNote(noteJson, noteName, subject); - note.persist(subject); - broadcastNote(note); - broadcastNoteList(subject, userAndRoles); - } return note; } - private void removeParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void removeParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } String noteId = getOpenNoteId(conn); - - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "write")) { - return; - } - - final Note note = notebook.getNote(noteId); - - // Don't allow removing paragraph when there is only one paragraph in the Notebook - if (note.getParagraphCount() > 1) { - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - Paragraph para = note.removeParagraph(subject.getUser(), paragraphId); - note.persist(subject); - if (para != null) { - broadcast(note.getId(), new Message(OP.PARAGRAPH_REMOVED). - put("id", para.getId())); - } - } + getNotebookService().removeParagraph(noteId, paragraphId, + getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn){ + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + super.onSuccess(p, context); + broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED). + put("id", p.getId())); + } + }); } - private void clearParagraphOutput(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private void clearParagraphOutput(NotebookSocket conn, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - String noteId = getOpenNoteId(conn); - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "write")) { - return; - } - - final Note note = notebook.getNote(noteId); - if (note.isPersonalizedMode()) { - String user = fromMessage.principal; - Paragraph p = note.clearPersonalizedParagraphOutput(paragraphId, user); - unicastParagraph(note, p, user); - } else { - note.clearParagraphOutput(paragraphId); - Paragraph paragraph = note.getParagraph(paragraphId); - broadcastParagraph(note, paragraph); - } + getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + super.onSuccess(p, context); + if (p.getNote().isPersonalizedMode()) { + unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser()); + } else { + broadcastParagraph(p.getNote(), p); + } + } + }); } - private void completion(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void completion(NotebookSocket conn, + Message fromMessage) throws IOException { + String noteId = getOpenNoteId(conn); String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); - Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); - if (paragraphId == null) { - conn.send(serializeMessage(resp)); - return; - } + getNotebookService().completion(noteId, paragraphId, buffer, cursor, + getServiceContext(fromMessage), + new WebSocketServiceCallback<List<InterpreterCompletion>>(conn) { + @Override + public void onSuccess(List<InterpreterCompletion> completions, ServiceContext context) + throws IOException { + super.onSuccess(completions, context); + Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); + resp.put("completions", completions); + conn.send(serializeMessage(resp)); + } - final Note note = notebook.getNote(getOpenNoteId(conn)); - List<InterpreterCompletion> candidates; - try { - candidates = note.completion(paragraphId, buffer, cursor); - } catch (RuntimeException e) { - LOG.info("Fail to get completion", e); - candidates = new ArrayList<>(); - } - resp.put("completions", candidates); - conn.send(serializeMessage(resp)); + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + super.onFailure(ex, context); + Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); + resp.put("completions", new ArrayList<>()); + conn.send(serializeMessage(resp)); + } + }); } /** * When angular object updated from client. * - * @param conn the web socket. - * @param notebook the notebook. + * @param conn the web socket. + * @param notebook the notebook. * @param fromMessage the message. */ private void angularObjectUpdated(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) { + Notebook notebook, Message fromMessage) { String noteId = (String) fromMessage.get("noteId"); String paragraphId = (String) fromMessage.get("paragraphId"); String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); @@ -1547,7 +1458,7 @@ public class NotebookServer extends WebSocketServlet * and a paragraph id. */ protected void angularObjectClientBind(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws Exception { + Notebook notebook, Message fromMessage) throws Exception { String noteId = fromMessage.getType("noteId"); String varName = fromMessage.getType("name"); Object varValue = fromMessage.get("value"); @@ -1581,7 +1492,8 @@ public class NotebookServer extends WebSocketServlet * and an optional list of paragraph id(s). */ protected void angularObjectClientUnbind(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws Exception { + Notebook notebook, Message fromMessage) + throws Exception { String noteId = fromMessage.getType("noteId"); String varName = fromMessage.getType("name"); String paragraphId = fromMessage.getType("paragraphId"); @@ -1609,7 +1521,7 @@ public class NotebookServer extends WebSocketServlet } private InterpreterGroup findInterpreterGroupForParagraph(Note note, String paragraphId) - throws Exception { + throws Exception { final Paragraph paragraph = note.getParagraph(paragraphId); if (paragraph == null) { throw new IllegalArgumentException("Unknown paragraph with id : " + paragraphId); @@ -1618,8 +1530,10 @@ public class NotebookServer extends WebSocketServlet } private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId, String varName, - Object varValue, RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId, - NotebookSocket conn) { + Object varValue, + RemoteAngularObjectRegistry remoteRegistry, + String interpreterGroupId, + NotebookSocket conn) { final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId); @@ -1629,8 +1543,9 @@ public class NotebookServer extends WebSocketServlet } private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, String varName, - RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId, - NotebookSocket conn) { + RemoteAngularObjectRegistry remoteRegistry, + String interpreterGroupId, + NotebookSocket conn) { final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId); this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao) @@ -1639,8 +1554,9 @@ public class NotebookServer extends WebSocketServlet } private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName, - Object varValue, AngularObjectRegistry registry, String interpreterGroupId, - NotebookSocket conn) { + Object varValue, AngularObjectRegistry registry, + String interpreterGroupId, + NotebookSocket conn) { AngularObject angularObject = registry.get(varName, noteId, paragraphId); if (angularObject == null) { angularObject = registry.add(varName, varValue, noteId, paragraphId); @@ -1655,7 +1571,8 @@ public class NotebookServer extends WebSocketServlet } private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, String varName, - AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) { + AngularObjectRegistry registry, + String interpreterGroupId, NotebookSocket conn) { final AngularObject removed = registry.remove(varName, noteId, paragraphId); if (removed != null) { this.broadcastExcept(noteId, @@ -1665,8 +1582,8 @@ public class NotebookServer extends WebSocketServlet } } - private void moveParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void moveParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -1674,31 +1591,22 @@ public class NotebookServer extends WebSocketServlet final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); String noteId = getOpenNoteId(conn); - final Note note = notebook.getNote(noteId); - - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "write")) { - return; - } - - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - note.moveParagraph(paragraphId, newIndex); - note.persist(subject); - broadcast(note.getId(), - new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex)); + getNotebookService().moveParagraph(noteId, paragraphId, newIndex, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph result, ServiceContext context) throws IOException { + super.onSuccess(result, context); + broadcast(result.getNote().getId(), + new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex)); + } + }); } - private String insertParagraph(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private String insertParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); String noteId = getOpenNoteId(conn); - final Note note = notebook.getNote(noteId); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "write")) { - return null; - } Map<String, Object> config; if (fromMessage.get("config") != null) { config = (Map<String, Object>) fromMessage.get("config"); @@ -1706,85 +1614,59 @@ public class NotebookServer extends WebSocketServlet config = new HashMap<>(); } - Paragraph newPara = note.insertNewParagraph(index, subject); - newPara.setConfig(config); - note.persist(subject); - broadcastNewParagraph(note, newPara); + Paragraph newPara = getNotebookService().insertParagraph(noteId, index, config, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + super.onSuccess(p, context); + broadcastNewParagraph(p.getNote(), p); + } + }); return newPara.getId(); } - private void copyParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { - String newParaId = insertParagraph(conn, userAndRoles, notebook, fromMessage); + private void copyParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { + String newParaId = insertParagraph(conn, fromMessage); if (newParaId == null) { return; } fromMessage.put("id", newParaId); - updateParagraph(conn, userAndRoles, notebook, fromMessage); + updateParagraph(conn, fromMessage); } - private void cancelParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } String noteId = getOpenNoteId(conn); - - if (!hasParagraphRunnerPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "write")) { - return; - } - - final Note note = notebook.getNote(noteId); - Paragraph p = note.getParagraph(paragraphId); - p.abort(); + getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage), + new WebSocketServiceCallback<>(conn)); } - private void runAllParagraphs(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private void runAllParagraphs(NotebookSocket conn, + Message fromMessage) throws IOException { final String noteId = (String) fromMessage.get("noteId"); if (StringUtils.isBlank(noteId)) { return; } - - if (!hasParagraphRunnerPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "run all paragraphs")) { - return; - } - List<Map<String, Object>> paragraphs = gson.fromJson(String.valueOf(fromMessage.data.get("paragraphs")), - new TypeToken<List<Map<String, Object>>>() {}.getType()); - - for (Map<String, Object> raw : paragraphs) { - String paragraphId = (String) raw.get("id"); - if (paragraphId == null) { - continue; - } - - String text = (String) raw.get("paragraph"); - String title = (String) raw.get("title"); - Map<String, Object> params = (Map<String, Object>) raw.get("params"); - Map<String, Object> config = (Map<String, Object>) raw.get("config"); - - Note note = notebook.getNote(noteId); - Paragraph p = setParagraphUsingMessage(note, fromMessage, - paragraphId, text, title, params, config); + new TypeToken<List<Map<String, Object>>>() { + }.getType()); - if (p.isEnabled() && !persistAndExecuteSingleParagraph(conn, note, p, true)) { - // stop execution when one paragraph fails. - break; - } - } + getNotebookService().runAllParagraphs(noteId, paragraphs, getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn)); } private void broadcastSpellExecution(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -1837,39 +1719,40 @@ public class NotebookServer extends WebSocketServlet new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn); } - private void runParagraph(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void runParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { + //TODO(zjffdu) it is possible ? return; } String noteId = getOpenNoteId(conn); - - if (!hasParagraphRunnerPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "write")) { - return; - } - - // 1. clear paragraph only if personalized, - // otherwise this will be handed in `onOutputClear` - final Note note = notebook.getNote(noteId); - if (note.isPersonalizedMode()) { - String user = fromMessage.principal; - Paragraph p = note.clearPersonalizedParagraphOutput(paragraphId, user); - unicastParagraph(note, p, user); - } - - // 2. set paragraph values String text = (String) fromMessage.get("paragraph"); String title = (String) fromMessage.get("title"); Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); + getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config, false, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + if (p.getNote().isPersonalizedMode()) { + Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId, + context.getAutheInfo().getUser()); + unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser()); + } - Paragraph p = setParagraphUsingMessage(note, fromMessage, paragraphId, - text, title, params, config); - - persistAndExecuteSingleParagraph(conn, note, p, false); + // if it's the last paragraph and not empty, let's add a new one + boolean isTheLastParagraph = p.getNote().isLastParagraph(paragraphId); + if (!(Strings.isNullOrEmpty(p.getText()) || + Strings.isNullOrEmpty(p.getScriptText())) && + isTheLastParagraph) { + Paragraph newPara = p.getNote().addNewParagraph(p.getAuthenticationInfo()); + broadcastNewParagraph(p.getNote(), newPara); + } + } + }); } private void addNewParagraphIfLastParagraphIsExecuted(Note note, Paragraph p) { @@ -1887,7 +1770,7 @@ public class NotebookServer extends WebSocketServlet * @return false if failed to save a note */ private boolean persistNoteWithAuthInfo(NotebookSocket conn, Note note, Paragraph p) - throws IOException { + throws IOException { try { note.persist(p.getAuthenticationInfo()); return true; @@ -1901,28 +1784,9 @@ public class NotebookServer extends WebSocketServlet } } - private boolean persistAndExecuteSingleParagraph(NotebookSocket conn, Note note, Paragraph p, - boolean blocking) throws IOException { - addNewParagraphIfLastParagraphIsExecuted(note, p); - if (!persistNoteWithAuthInfo(conn, note, p)) { - return false; - } - - try { - return note.run(p.getId(), blocking); - } catch (Exception ex) { - LOG.error("Exception from run", ex); - if (p != null) { - p.setReturn(new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), ex); - p.setStatus(Status.ERROR); - broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p)); - } - return false; - } - } - private Paragraph setParagraphUsingMessage(Note note, Message fromMessage, String paragraphId, - String text, String title, Map<String, Object> params, Map<String, Object> config) { + String text, String title, Map<String, Object> params, + Map<String, Object> config) { Paragraph p = note.getParagraph(paragraphId); p.setText(text); p.setTitle(title); @@ -1945,7 +1809,7 @@ public class NotebookServer extends WebSocketServlet } private void sendAllConfigurations(NotebookSocket conn, HashSet<String> userAndRoles, - Notebook notebook) throws IOException { + Notebook notebook) throws IOException { ZeppelinConfiguration conf = notebook.getConf(); Map<String, String> configurations = @@ -1962,97 +1826,94 @@ public class NotebookServer extends WebSocketServlet new Message(OP.CONFIGURATIONS_INFO).put("configurations", configurations))); } - private void checkpointNote(NotebookSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void checkpointNote(NotebookSocket conn, Message fromMessage) + throws IOException { String noteId = (String) fromMessage.get("noteId"); String commitMessage = (String) fromMessage.get("commitMessage"); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - Revision revision = notebook.checkpointNote(noteId, commitMessage, subject); - if (!Revision.isEmpty(revision)) { - List<Revision> revisions = notebook.listRevisionHistory(noteId, subject); - conn.send( - serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions))); - } else { - conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", - "Couldn't checkpoint note revision: possibly storage doesn't support versioning. " - + "Please check the logs for more details."))); - } + + getNotebookService().checkpointNote(noteId, commitMessage, getServiceContext(fromMessage), + new WebSocketServiceCallback<Revision>(conn) { + @Override + public void onSuccess(Revision revision, ServiceContext context) throws IOException { + super.onSuccess(revision, context); + if (!Revision.isEmpty(revision)) { + List<Revision> revisions = + notebook().listRevisionHistory(noteId, context.getAutheInfo()); + conn.send( + serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", + revisions))); + } else { + conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", + "Couldn't checkpoint note revision: possibly storage doesn't support versioning. " + + "Please check the logs for more details."))); + } + } + }); } - private void listRevisionHistory(NotebookSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void listRevisionHistory(NotebookSocket conn, Message fromMessage) + throws IOException { String noteId = (String) fromMessage.get("noteId"); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - List<Revision> revisions = notebook.listRevisionHistory(noteId, subject); - - conn.send( - serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions))); + getNotebookService().listRevisionHistory(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<List<Revision>>(conn) { + @Override + public void onSuccess(List<Revision> revisions, ServiceContext context) + throws IOException { + super.onSuccess(revisions, context); + conn.send(serializeMessage( + new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions))); + } + }); } - private void setNoteRevision(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void setNoteRevision(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); String revisionId = (String) fromMessage.get("revisionId"); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "update")) { - return; - } - - Note headNote = null; - boolean setRevisionStatus; - try { - headNote = notebook.setNoteRevision(noteId, revisionId, subject); - setRevisionStatus = headNote != null; - } catch (Exception e) { - setRevisionStatus = false; - LOG.error("Failed to set given note revision", e); - } - if (setRevisionStatus) { - notebook.loadNoteFromRepo(noteId, subject); - } - - conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", setRevisionStatus))); - - if (setRevisionStatus) { - Note reloadedNote = notebook.getNote(headNote.getId()); - broadcastNote(reloadedNote); - } else { - conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", - "Couldn't set note to the given revision. " - + "Please check the logs for more details."))); - } + getNotebookService().setNoteRevision(noteId, revisionId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + Note reloadedNote = notebook().loadNoteFromRepo(noteId, context.getAutheInfo()); + conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", true))); + broadcastNote(reloadedNote); + } + }); } - private void getNoteByRevision(NotebookSocket conn, Notebook notebook, Message fromMessage) + private void getNoteByRevision(NotebookSocket conn, Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); String revisionId = (String) fromMessage.get("revisionId"); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - Note revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject); - conn.send(serializeMessage( - new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId) - .put("note", revisionNote))); + getNotebookService().getNotebyRevision(noteId, revisionId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + conn.send(serializeMessage( + new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId) + .put("note", note))); + } + }); } - private void getNoteByRevisionForCompare(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + private void getNoteByRevisionForCompare(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); String revisionId = (String) fromMessage.get("revisionId"); - String position = (String) fromMessage.get("position"); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - Note revisionNote; - if (revisionId.equals("Head")) { - revisionNote = notebook.getNote(noteId); - } else { - revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject); - } - - conn.send(serializeMessage( - new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId) - .put("revisionId", revisionId).put("position", position).put("note", revisionNote))); + getNotebookService().getNoteByRevisionForCompare(noteId, revisionId, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + conn.send(serializeMessage( + new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId) + .put("revisionId", revisionId).put("position", position).put("note", note))); + } + }); } /** @@ -2074,7 +1935,7 @@ public class NotebookServer extends WebSocketServlet */ @Override public void onOutputUpdated(String noteId, String paragraphId, int index, - InterpreterResult.Type type, String output) { + InterpreterResult.Type type, String output) { Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId) .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output); Note note = notebook().getNote(noteId); @@ -2105,7 +1966,7 @@ public class NotebookServer extends WebSocketServlet */ @Override public void onOutputAppend(String noteId, String paragraphId, int index, String appId, - String output) { + String output) { Message msg = new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId) .put("index", index).put("appId", appId).put("data", output); @@ -2117,7 +1978,7 @@ public class NotebookServer extends WebSocketServlet */ @Override public void onOutputUpdated(String noteId, String paragraphId, int index, String appId, - InterpreterResult.Type type, String output) { + InterpreterResult.Type type, String output) { Message msg = new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId) .put("index", index).put("type", type).put("appId", appId).put("data", output); @@ -2322,7 +2183,7 @@ public class NotebookServer extends WebSocketServlet if (job.getStatus() == Status.FINISHED) { LOG.info("Job {} is finished successfully, status: {}", job.getId(), job.getStatus()); } else { - LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}" , job.getId(), + LOG.warn("Job {} is finished, status: {}, exception: {}, result: {}", job.getId(), job.getStatus(), job.getException(), job.getReturn()); } @@ -2484,7 +2345,7 @@ public class NotebookServer extends WebSocketServlet } } - private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject) + private void getInterpreterSettings(NotebookSocket conn) throws IOException { List<InterpreterSetting> availableSettings = notebook().getInterpreterSettingManager().get(); conn.send(serializeMessage( @@ -2525,7 +2386,7 @@ public class NotebookServer extends WebSocketServlet private void broadcastToAllConnectionsExcept(NotebookSocket exclude, String serialized) { synchronized (connectedSockets) { - for (NotebookSocket conn: connectedSockets) { + for (NotebookSocket conn : connectedSockets) { if (exclude != null && exclude.equals(conn)) { continue; } @@ -2555,7 +2416,7 @@ public class NotebookServer extends WebSocketServlet @Override public void onParaInfosReceived(String noteId, String paragraphId, - String interpreterSettingId, Map<String, String> metaInfos) { + String interpreterSettingId, Map<String, String> metaInfos) { Note note = notebook().getNote(noteId); if (note != null) { Paragraph paragraph = note.getParagraph(paragraphId); @@ -2601,53 +2462,39 @@ public class NotebookServer extends WebSocketServlet setting.clearNoteIdAndParaMap(); } - public void broadcastNoteForms(Note note) { + private void broadcastNoteForms(Note note) { GUI formsSettings = new GUI(); formsSettings.setForms(note.getNoteForms()); formsSettings.setParams(note.getNoteParams()); - broadcast(note.getId(), new Message(OP.SAVE_NOTE_FORMS).put("formsData", formsSettings)); } - private void saveNoteForms(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void saveNoteForms(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); Map<String, Object> noteParams = (Map<String, Object>) fromMessage.get("noteParams"); - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "update")) { - return; - } - - Note note = notebook.getNote(noteId); - if (note != null) { - note.setNoteParams(noteParams); - - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - note.persist(subject); - broadcastNoteForms(note); - } + getNotebookService().saveNoteForms(noteId, noteParams, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + broadcastNoteForms(note); + } + }); } - private void removeNoteForms(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, - Message fromMessage) throws IOException { + private void removeNoteForms(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); String formName = (String) fromMessage.get("formName"); - if (!hasParagraphWriterPermission(conn, notebook, noteId, - userAndRoles, fromMessage.principal, "update")) { - return; - } - - Note note = notebook.getNote(noteId); - if (note != null) { - note.getNoteForms().remove(formName); - note.getNoteParams().remove(formName); - - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - note.persist(subject); - broadcastNoteForms(note); - } + getNotebookService().removeNoteForms(noteId, formName, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + broadcastNoteForms(note); + } + }); } @Override @@ -2665,4 +2512,43 @@ public class NotebookServer extends WebSocketServlet m.data.put("notice", message); broadcast(m); } + + private ServiceContext getServiceContext(Message message) { + AuthenticationInfo authInfo = + new AuthenticationInfo(message.principal, message.roles, message.ticket); + Set<String> userAndRoles = new HashSet<>(); + userAndRoles.add(message.principal); + if (message.roles != null && !message.roles.equals("")) { + HashSet<String> roles = + gson.fromJson(message.roles, new TypeToken<HashSet<String>>() { + }.getType()); + if (roles != null) { + userAndRoles.addAll(roles); + } + } + return new ServiceContext(authInfo, userAndRoles); + } + + private class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> { + + private NotebookSocket conn; + + WebSocketServiceCallback(NotebookSocket conn) { + this.conn = conn; + } + + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + super.onFailure(ex, context); + if (ex instanceof ForbiddenException) { + Type type = new TypeToken<Map<String, String>>(){}.getType(); + Map<String, String> jsonObject = + gson.fromJson(((ForbiddenException) ex).getResponse().getEntity().toString(), type); + conn.send(serializeMessage(new Message(OP.AUTH_INFO) + .put("info", jsonObject.get("message")))); + } else { + conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", ex.getMessage()))); + } + } + } }
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java index 66e4038..65740ff 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -77,4 +77,9 @@ public class NotebookSocket extends WebSocketAdapter { public void setUser(String user) { this.user = user; } + + @Override + public String toString() { + return request.getRemoteHost() + ":" + request.getRemotePort(); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java deleted file mode 100644 index ddfe7c4..0000000 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ServiceCallback.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.socket; - -/** This will be used by some services to pass messages to frontend via WebSocket */ -public interface ServiceCallback { - void onStart(String message); - - void onSuccess(String message); - - void onFailure(String message); -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java index ce867ee..62a3782 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinRestApiTest.java @@ -221,8 +221,7 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { @Test public void testDeleteNoteBadId() throws IOException { LOG.info("testDeleteNoteBadId"); - testDeleteNote("2AZFXEX97"); - testDeleteNote("bad_ID"); + testDeleteNotExistNote("bad_ID"); } @Test @@ -317,6 +316,13 @@ public class ZeppelinRestApiTest extends AbstractTestRestApi { } } + private void testDeleteNotExistNote(String noteId) throws IOException { + DeleteMethod delete = httpDelete(("/notebook/" + noteId)); + LOG.info("testDeleteNote delete response\n" + delete.getResponseBodyAsString()); + assertThat("Test delete method:", delete, isNotFound()); + delete.releaseConnection(); + } + @Test public void testCloneNote() throws IOException, IllegalArgumentException { LOG.info("testCloneNote"); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java index ab74012..cc4b1df 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java @@ -33,7 +33,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.rest.message.InterpreterInstallationRequest; -import org.apache.zeppelin.socket.ServiceCallback; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -120,19 +119,19 @@ public class InterpreterServiceTest { new InterpreterInstallationRequest(interpreterName, artifactName), dependencyResolver, specificInterpreterPath, - new ServiceCallback() { + new SimpleServiceCallback<String>() { @Override - public void onStart(String message) { + public void onStart(String message, ServiceContext context) { assertEquals("Starting to download " + interpreterName + " interpreter", message); } @Override - public void onSuccess(String message) { + public void onSuccess(String message, ServiceContext context) { assertEquals(interpreterName + " downloaded", message); } @Override - public void onFailure(String message) { + public void onFailure(Exception ex, ServiceContext context) { fail(); } });