ZEPPELIN-3681. Introduce NotebookService ### What is this PR for? This the first phase of refactoring work of ZEPPELIN-3288. The background of ZEPPELIN-3288 is that zeppelin provides 2 kinds of api (rest api and websocket), both of them invoke the same backend logic. This PR put all the notebook related operation into class NotebookService which called by both NotebookServer & NotebookRestApi.
### What type of PR is it? [Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3288 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3119 from zjffdu/ZEPPELIN-3681 and squashes the following commits: 6dda42ef8 [Jeff Zhang] ZEPPELIN-3681. Introduce NotebookService Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/6beb1bb3 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/6beb1bb3 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/6beb1bb3 Branch: refs/heads/master Commit: 6beb1bb3c36892bb0466b60ec142c5597f6e8f9e Parents: e740efb Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Aug 2 17:50:17 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Fri Aug 10 14:54:33 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/rest/InterpreterRestApi.java | 51 +- .../apache/zeppelin/rest/NotebookRestApi.java | 194 +-- .../rest/exception/ForbiddenException.java | 13 +- .../rest/exception/NotFoundException.java | 59 - .../rest/exception/NoteNotFoundException.java | 32 + .../exception/ParagraphNotFoundException.java | 31 + .../zeppelin/service/InterpreterService.java | 16 +- .../zeppelin/service/NotebookService.java | 793 ++++++++++++ .../zeppelin/service/ServiceCallback.java | 51 + .../apache/zeppelin/service/ServiceContext.java | 45 + .../zeppelin/service/SimpleServiceCallback.java | 49 + .../apache/zeppelin/socket/NotebookServer.java | 1150 ++++++++---------- .../apache/zeppelin/socket/NotebookSocket.java | 5 + .../apache/zeppelin/socket/ServiceCallback.java | 27 - .../zeppelin/rest/ZeppelinRestApiTest.java | 10 +- .../service/InterpreterServiceTest.java | 9 +- .../zeppelin/socket/NotebookServerTest.java | 60 +- .../interpreter/InterpreterFactory.java | 8 +- .../org/apache/zeppelin/notebook/Notebook.java | 6 +- 19 files changed, 1707 insertions(+), 902 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index b6a39b3..16d39d8 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -18,19 +18,30 @@ package org.apache.zeppelin.rest; import com.google.common.collect.Maps; -import javax.validation.constraints.NotNull; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.zeppelin.annotation.ZeppelinApi; +import org.apache.zeppelin.dep.Repository; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterPropertyType; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; -import org.apache.zeppelin.socket.ServiceCallback; +import org.apache.zeppelin.rest.message.InterpreterInstallationRequest; +import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest; +import org.apache.zeppelin.rest.message.RestartInterpreterRequest; +import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest; +import org.apache.zeppelin.server.JsonResponse; +import org.apache.zeppelin.service.InterpreterService; +import org.apache.zeppelin.service.ServiceContext; +import org.apache.zeppelin.service.SimpleServiceCallback; +import org.apache.zeppelin.socket.NotebookServer; +import org.apache.zeppelin.utils.SecurityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonatype.aether.repository.RemoteRepository; -import java.io.IOException; -import java.util.List; -import java.util.Map; - +import javax.validation.constraints.NotNull; import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -40,21 +51,9 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; - -import org.apache.zeppelin.annotation.ZeppelinApi; -import org.apache.zeppelin.dep.Repository; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterPropertyType; -import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.rest.message.InterpreterInstallationRequest; -import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest; -import org.apache.zeppelin.rest.message.RestartInterpreterRequest; -import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest; -import org.apache.zeppelin.server.JsonResponse; -import org.apache.zeppelin.service.InterpreterService; -import org.apache.zeppelin.socket.NotebookServer; -import org.apache.zeppelin.utils.SecurityUtils; +import java.io.IOException; +import java.util.List; +import java.util.Map; /** * Interpreter Rest API. @@ -292,9 +291,9 @@ public class InterpreterRestApi { try { interpreterService.installInterpreter( request, - new ServiceCallback() { + new SimpleServiceCallback<String>() { @Override - public void onStart(String message) { + public void onStart(String message, ServiceContext context) { Message m = new Message(OP.INTERPRETER_INSTALL_STARTED); Map<String, Object> data = Maps.newHashMap(); data.put("result", "Starting"); @@ -304,7 +303,7 @@ public class InterpreterRestApi { } @Override - public void onSuccess(String message) { + public void onSuccess(String message, ServiceContext context) { Message m = new Message(OP.INTERPRETER_INSTALL_RESULT); Map<String, Object> data = Maps.newHashMap(); data.put("result", "Succeed"); @@ -314,11 +313,11 @@ public class InterpreterRestApi { } @Override - public void onFailure(String message) { + public void onFailure(Exception ex, ServiceContext context) { Message m = new Message(OP.INTERPRETER_INSTALL_RESULT); Map<String, Object> data = Maps.newHashMap(); data.put("result", "Failed"); - data.put("message", message); + data.put("message", ex.getMessage()); m.data = data; notebookServer.broadcast(m); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index f23ec0e..90647f4 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -29,7 +29,8 @@ import org.apache.zeppelin.notebook.NotebookAuthorization; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.rest.exception.BadRequestException; import org.apache.zeppelin.rest.exception.ForbiddenException; -import org.apache.zeppelin.rest.exception.NotFoundException; +import org.apache.zeppelin.rest.exception.NoteNotFoundException; +import org.apache.zeppelin.rest.exception.ParagraphNotFoundException; import org.apache.zeppelin.rest.message.CronRequest; import org.apache.zeppelin.rest.message.NewNoteRequest; import org.apache.zeppelin.rest.message.NewParagraphRequest; @@ -38,6 +39,9 @@ import org.apache.zeppelin.rest.message.RunParagraphWithParametersRequest; import org.apache.zeppelin.rest.message.UpdateParagraphRequest; import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.server.JsonResponse; +import org.apache.zeppelin.service.NotebookService; +import org.apache.zeppelin.service.ServiceContext; +import org.apache.zeppelin.service.SimpleServiceCallback; import org.apache.zeppelin.socket.NotebookServer; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.utils.SecurityUtils; @@ -53,13 +57,14 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; @@ -75,6 +80,7 @@ public class NotebookRestApi { private NotebookServer notebookServer; private SearchService noteSearchService; private NotebookAuthorization notebookAuthorization; + private NotebookService notebookService; public NotebookRestApi() { } @@ -82,6 +88,7 @@ public class NotebookRestApi { public NotebookRestApi(Notebook notebook, NotebookServer notebookServer, SearchService search) { this.notebook = notebook; this.notebookServer = notebookServer; + this.notebookService = new NotebookService(notebook); this.noteSearchService = search; this.notebookAuthorization = notebook.getNotebookAuthorization(); } @@ -183,7 +190,7 @@ public class NotebookRestApi { private void checkIfNoteIsNotNull(Note note) { if (note == null) { - throw new NotFoundException("note not found"); + throw new NoteNotFoundException("note not found"); } } @@ -196,7 +203,7 @@ public class NotebookRestApi { private void checkIfParagraphIsNotNull(Paragraph paragraph) { if (paragraph == null) { - throw new NotFoundException("paragraph not found"); + throw new ParagraphNotFoundException("paragraph not found"); } } @@ -276,11 +283,8 @@ public class NotebookRestApi { @Path("/") @ZeppelinApi public Response getNoteList() throws IOException { - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - HashSet<String> userAndRoles = SecurityUtils.getAssociatedRoles(); - userAndRoles.add(subject.getUser()); - List<Map<String, String>> notesInfo = notebookServer.generateNotesInfo(false, subject, - userAndRoles); + List<Map<String, String>> notesInfo = notebookService.listNotes(false, getServiceContext(), + new RestServiceCallback<List<Map<String, String>>>()); return new JsonResponse<>(Status.OK, "", notesInfo).build(); } @@ -288,10 +292,8 @@ public class NotebookRestApi { @Path("{noteId}") @ZeppelinApi public Response getNote(@PathParam("noteId") String noteId) throws IOException { - Note note = notebook.getNote(noteId); - checkIfNoteIsNotNull(note); - checkIfUserCanRead(noteId, "Insufficient privileges you cannot get this note"); - + Note note = + notebookService.getNote(noteId, getServiceContext(), new RestServiceCallback<Note>()); return new JsonResponse<>(Status.OK, "", note).build(); } @@ -314,17 +316,17 @@ public class NotebookRestApi { /** * import new note REST API. * - * @param req - note Json + * @param noteJson - note Json * @return JSON with new note ID * @throws IOException */ @POST @Path("import") @ZeppelinApi - public Response importNote(String req) throws IOException { - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - Note newNote = notebook.importNote(req, null, subject); - return new JsonResponse<>(Status.OK, "", newNote.getId()).build(); + public Response importNote(String noteJson) throws IOException { + Note note = notebookService.importNote(null, noteJson, getServiceContext(), + new RestServiceCallback<Note>()); + return new JsonResponse<>(Status.OK, "", note.getId()).build(); } /** @@ -378,16 +380,15 @@ public class NotebookRestApi { @ZeppelinApi public Response deleteNote(@PathParam("noteId") String noteId) throws IOException { LOG.info("Delete note {} ", noteId); - checkIfUserIsOwner(noteId, "Insufficient privileges you cannot delete this note"); - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - if (!(noteId.isEmpty())) { - Note note = notebook.getNote(noteId); - if (note != null) { - notebook.removeNote(noteId, subject); - } - } + notebookService.removeNote(noteId, + getServiceContext(), + new RestServiceCallback<String>() { + @Override + public void onSuccess(String message, ServiceContext context) { + notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); - notebookServer.broadcastNoteList(subject, SecurityUtils.getAssociatedRoles()); return new JsonResponse<>(Status.OK, "").build(); } @@ -413,9 +414,14 @@ public class NotebookRestApi { newNoteName = request.getName(); } AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - Note newNote = notebook.cloneNote(noteId, newNoteName, subject); - notebookServer.broadcastNote(newNote); - notebookServer.broadcastNoteList(subject, SecurityUtils.getAssociatedRoles()); + Note newNote = notebookService.cloneNote(noteId, newNoteName, getServiceContext(), + new SimpleServiceCallback<Note>(){ + @Override + public void onSuccess(Note newNote, ServiceContext context) throws IOException { + notebookServer.broadcastNote(newNote); + notebookServer.broadcastNoteList(subject, context.getUserAndRoles()); + } + }); return new JsonResponse<>(Status.OK, "", newNote.getId()).build(); } @@ -433,21 +439,20 @@ public class NotebookRestApi { String message) throws IOException { LOG.info("rename note by JSON {}", message); RenameNoteRequest request = gson.fromJson(message, RenameNoteRequest.class); - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - - checkIfUserCanWrite(noteId, "Insufficient privileges you cannot rename this note"); - Note note = notebook.getNote(noteId); - checkIfNoteIsNotNull(note); String newName = request.getName(); if (newName.isEmpty()) { LOG.warn("Trying to rename notebook {} with empty name parameter", noteId); throw new BadRequestException("name can not be empty"); } - note.setName(newName); - - notebookServer.broadcastNote(note); - notebookServer.broadcastNoteList(subject, SecurityUtils.getAssociatedRoles()); + notebookService.renameNote(noteId, request.getName(), getServiceContext(), + new RestServiceCallback<Note>(){ + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + notebookServer.broadcastNote(note); + notebookServer.broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); return new JsonResponse(Status.OK, "").build(); } @@ -462,7 +467,7 @@ public class NotebookRestApi { @Path("{noteId}/paragraph") @ZeppelinApi public Response insertParagraph(@PathParam("noteId") String noteId, String message) - throws IOException { + throws IOException { String user = SecurityUtils.getPrincipal(); LOG.info("insert paragraph {} {}", noteId, message); @@ -513,7 +518,6 @@ public class NotebookRestApi { * * @param message json containing the "text" and optionally the "title" of the paragraph, e.g. * {"text" : "updated text", "title" : "Updated title" } - * */ @PUT @Path("{noteId}/paragraph/{paragraphId}") @@ -585,20 +589,16 @@ public class NotebookRestApi { checkIfNoteIsNotNull(note); checkIfUserCanWrite(noteId, "Insufficient privileges you cannot move paragraph"); - Paragraph p = note.getParagraph(paragraphId); - checkIfParagraphIsNotNull(p); + notebookService.moveParagraph(noteId, paragraphId, Integer.parseInt(newIndex), + getServiceContext(), + new RestServiceCallback<Paragraph>() { + @Override + public void onSuccess(Paragraph result, ServiceContext context) throws IOException { + notebookServer.broadcastNote(result.getNote()); + } + }); + return new JsonResponse(Status.OK, "").build(); - try { - note.moveParagraph(paragraphId, Integer.parseInt(newIndex), true); - - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - note.persist(subject); - notebookServer.broadcastNote(note); - return new JsonResponse(Status.OK, "").build(); - } catch (IndexOutOfBoundsException e) { - LOG.error("Exception in NotebookRestApi while moveParagraph ", e); - return new JsonResponse(Status.BAD_REQUEST, "paragraph's new index is out of bound").build(); - } } /** @@ -615,18 +615,13 @@ public class NotebookRestApi { @PathParam("paragraphId") String paragraphId) throws IOException { LOG.info("delete paragraph {} {}", noteId, paragraphId); - Note note = notebook.getNote(noteId); - checkIfNoteIsNotNull(note); - checkIfUserCanRead(noteId, - "Insufficient privileges you cannot remove paragraph from this note"); - - Paragraph p = note.getParagraph(paragraphId); - checkIfParagraphIsNotNull(p); - - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - note.removeParagraph(SecurityUtils.getPrincipal(), paragraphId); - note.persist(subject); - notebookServer.broadcastNote(note); + notebookService.removeParagraph(noteId, paragraphId, getServiceContext(), + new RestServiceCallback<Paragraph>() { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + notebookServer.broadcastNote(p.getNote()); + } + }); return new JsonResponse(Status.OK, "").build(); } @@ -665,7 +660,7 @@ public class NotebookRestApi { @ZeppelinApi public Response runNoteJobs(@PathParam("noteId") String noteId, @QueryParam("waitToFinish") Boolean waitToFinish) - throws IOException, IllegalArgumentException { + throws IOException, IllegalArgumentException { boolean blocking = waitToFinish == null || waitToFinish; LOG.info("run note jobs {} waitToFinish: {}", noteId, blocking); Note note = notebook.getNote(noteId); @@ -775,22 +770,14 @@ public class NotebookRestApi { throws IOException, IllegalArgumentException { LOG.info("run paragraph job asynchronously {} {} {}", noteId, paragraphId, message); - Note note = notebook.getNote(noteId); - checkIfNoteIsNotNull(note); - checkIfUserCanRun(noteId, "Insufficient privileges you cannot run job for this note"); - Paragraph paragraph = note.getParagraph(paragraphId); - checkIfParagraphIsNotNull(paragraph); - - // handle params if presented - handleParagraphParams(message, note, paragraph); - - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - subject.setRoles(new LinkedList<>(SecurityUtils.getAssociatedRoles())); - - paragraph.setAuthenticationInfo(subject); - note.persist(subject); - - note.run(paragraph.getId()); + Map<String, Object> params = new HashMap<>(); + if (!StringUtils.isEmpty(message)) { + RunParagraphWithParametersRequest request = + RunParagraphWithParametersRequest.fromJson(message); + params = request.getParams(); + } + notebookService.runParagraph(noteId, paragraphId, "", "", params, + new HashMap<String, Object>(), false, getServiceContext(), new RestServiceCallback<>()); return new JsonResponse<>(Status.OK).build(); } @@ -854,16 +841,12 @@ public class NotebookRestApi { @DELETE @Path("job/{noteId}/{paragraphId}") @ZeppelinApi - public Response stopParagraph(@PathParam("noteId") String noteId, - @PathParam("paragraphId") String paragraphId) + public Response cancelParagraph(@PathParam("noteId") String noteId, + @PathParam("paragraphId") String paragraphId) throws IOException, IllegalArgumentException { LOG.info("stop paragraph job {} ", noteId); - Note note = notebook.getNote(noteId); - checkIfNoteIsNotNull(note); - checkIfUserCanRun(noteId, "Insufficient privileges you cannot stop paragraph"); - Paragraph p = note.getParagraph(paragraphId); - checkIfParagraphIsNotNull(p); - p.abort(); + notebookService.cancelParagraph(noteId, paragraphId, getServiceContext(), + new RestServiceCallback<Paragraph>()); return new JsonResponse<>(Status.OK).build(); } @@ -913,9 +896,9 @@ public class NotebookRestApi { @Path("cron/{noteId}") @ZeppelinApi public Response removeCronJob(@PathParam("noteId") String noteId) - throws IOException, IllegalArgumentException { + throws IOException, IllegalArgumentException { LOG.info("Remove cron job note {}", noteId); - + Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note); checkIfUserIsOwner(noteId, @@ -942,7 +925,7 @@ public class NotebookRestApi { @Path("cron/{noteId}") @ZeppelinApi public Response getCronJob(@PathParam("noteId") String noteId) - throws IOException, IllegalArgumentException { + throws IOException, IllegalArgumentException { LOG.info("Get cron job note {}", noteId); Note note = notebook.getNote(noteId); @@ -979,7 +962,7 @@ public class NotebookRestApi { /** * Get updated note jobs for job manager - * + * <p> * Return the `Note` change information within the post unix timestamp. * * @return JSON with status.OK @@ -1074,4 +1057,25 @@ public class NotebookRestApi { p.setConfig(origConfig); } + + private ServiceContext getServiceContext() { + AuthenticationInfo authInfo = new AuthenticationInfo(SecurityUtils.getPrincipal()); + Set<String> userAndRoles = Sets.newHashSet(); + userAndRoles.add(SecurityUtils.getPrincipal()); + userAndRoles.addAll(SecurityUtils.getAssociatedRoles()); + return new ServiceContext(authInfo, userAndRoles); + } + + private static class RestServiceCallback<T> extends SimpleServiceCallback<T> { + + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + super.onFailure(ex, context); + if (ex instanceof WebApplicationException) { + throw (WebApplicationException) ex; + } else { + throw new IOException(ex); + } + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java index 0c07def..abe08eb 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ForbiddenException.java @@ -27,21 +27,10 @@ import org.apache.zeppelin.utils.ExceptionUtils; * UnauthorizedException handler for WebApplicationException. */ public class ForbiddenException extends WebApplicationException { - private static final long serialVersionUID = 4394749068760407567L; - private static final String FORBIDDEN_MSG = "Not allowed to access"; - - public ForbiddenException() { - super(forbiddenJson(FORBIDDEN_MSG)); - } - private static Response forbiddenJson(String message) { return ExceptionUtils.jsonResponseContent(FORBIDDEN, message); } - - public ForbiddenException(Throwable cause, String message) { - super(cause, forbiddenJson(message)); - } - + public ForbiddenException(String message) { super(forbiddenJson(message)); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NotFoundException.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NotFoundException.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NotFoundException.java deleted file mode 100644 index 7f9c17d..0000000 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NotFoundException.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.rest.exception; -import static javax.ws.rs.core.Response.Status.NOT_FOUND; - -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; - -import org.apache.zeppelin.utils.ExceptionUtils; - -/** - * Not Found handler for WebApplicationException. - * - */ -public class NotFoundException extends WebApplicationException { - private static final long serialVersionUID = 2459398393216512293L; - - /** - * Create a HTTP 404 (Not Found) exception. - */ - public NotFoundException() { - super(ExceptionUtils.jsonResponse(NOT_FOUND)); - } - - /** - * Create a HTTP 404 (Not Found) exception. - * @param message the String that is the entity of the 404 response. - */ - public NotFoundException(String message) { - super(notFoundJson(message)); - } - - private static Response notFoundJson(String message) { - return ExceptionUtils.jsonResponseContent(NOT_FOUND, message); - } - - public NotFoundException(Throwable cause) { - super(cause, notFoundJson(cause.getMessage())); - } - - public NotFoundException(Throwable cause, String message) { - super(cause, notFoundJson(message)); - } - -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NoteNotFoundException.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NoteNotFoundException.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NoteNotFoundException.java new file mode 100644 index 0000000..773d749 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/NoteNotFoundException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.exception; + +import org.apache.zeppelin.utils.ExceptionUtils; + +import javax.ws.rs.WebApplicationException; + +import static javax.ws.rs.core.Response.Status.NOT_FOUND; + +public class NoteNotFoundException extends WebApplicationException { + + public NoteNotFoundException(String noteId) { + super(ExceptionUtils.jsonResponseContent(NOT_FOUND, "No such note: " + noteId)); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ParagraphNotFoundException.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ParagraphNotFoundException.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ParagraphNotFoundException.java new file mode 100644 index 0000000..4ec5ee1 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/ParagraphNotFoundException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.exception; + +import org.apache.zeppelin.utils.ExceptionUtils; + +import javax.ws.rs.WebApplicationException; + +import static javax.ws.rs.core.Response.Status.NOT_FOUND; + +public class ParagraphNotFoundException extends WebApplicationException { + + public ParagraphNotFoundException(String paragraphId) { + super(ExceptionUtils.jsonResponseContent(NOT_FOUND, "No such paragraph: " + paragraphId)); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java index a66bc71..930189f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java @@ -34,7 +34,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.rest.message.InterpreterInstallationRequest; -import org.apache.zeppelin.socket.ServiceCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonatype.aether.RepositoryException; @@ -124,11 +123,11 @@ public class InterpreterService { InterpreterInstallationRequest request, DependencyResolver dependencyResolver, Path interpreterDir, - ServiceCallback serviceCallback) { + ServiceCallback<String> serviceCallback) { try { logger.info("Start to download a dependency: {}", request.getName()); if (null != serviceCallback) { - serviceCallback.onStart("Starting to download " + request.getName() + " interpreter"); + serviceCallback.onStart("Starting to download " + request.getName() + " interpreter", null); } dependencyResolver.load(request.getArtifact(), interpreterDir.toFile()); @@ -138,7 +137,7 @@ public class InterpreterService { request.getName(), interpreterDir.toString()); if (null != serviceCallback) { - serviceCallback.onSuccess(request.getName() + " downloaded"); + serviceCallback.onSuccess(request.getName() + " downloaded", null); } } catch (RepositoryException | IOException e) { logger.error("Error while downloading dependencies", e); @@ -151,8 +150,13 @@ public class InterpreterService { e1); } if (null != serviceCallback) { - serviceCallback.onFailure( - "Error while downloading " + request.getName() + " as " + e.getMessage()); + try { + serviceCallback.onFailure( + new Exception("Error while downloading " + request.getName() + " as " + + e.getMessage()), null); + } catch (IOException e1) { + logger.error("ServiceCallback failure", e1); + } } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java new file mode 100644 index 0000000..401420f --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.service; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.notebook.Folder; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.NotebookAuthorization; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; +import org.apache.zeppelin.notebook.socket.Message; +import org.apache.zeppelin.rest.exception.BadRequestException; +import org.apache.zeppelin.rest.exception.ForbiddenException; +import org.apache.zeppelin.rest.exception.NoteNotFoundException; +import org.apache.zeppelin.rest.exception.ParagraphNotFoundException; +import org.apache.zeppelin.scheduler.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN; + +/** + * Service class for Notebook related operations. + */ +public class NotebookService { + + private static final Logger LOGGER = LoggerFactory.getLogger(NotebookService.class); + + private ZeppelinConfiguration zConf; + private Notebook notebook; + private NotebookAuthorization notebookAuthorization; + + public NotebookService(Notebook notebook) { + this.notebook = notebook; + this.notebookAuthorization = notebook.getNotebookAuthorization(); + this.zConf = notebook.getConf(); + } + + public Note getHomeNote(ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + String noteId = notebook.getConf().getString(ZEPPELIN_NOTEBOOK_HOMESCREEN); + Note note = null; + if (noteId != null) { + note = notebook.getNote(noteId); + if (note != null) { + if (!checkPermission(noteId, Permission.READER, Message.OP.GET_HOME_NOTE, context, + callback)) { + return null; + } + } else { + callback.onFailure(new Exception("configured HomePage is not existed"), context); + } + } + callback.onSuccess(note, context); + return note; + } + + public Note getNote(String noteId, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return null; + } + + if (!checkPermission(noteId, Permission.READER, Message.OP.GET_NOTE, context, + callback)) { + return null; + } + if (note.isPersonalizedMode()) { + note = note.getUserNote(context.getAutheInfo().getUser()); + } + callback.onSuccess(note, context); + return note; + } + + + public Note createNote(String defaultInterpreterGroup, + String noteName, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + if (defaultInterpreterGroup == null) { + defaultInterpreterGroup = zConf.getString( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT); + } + if (noteName == null) { + noteName = "Untitled Note"; + } + try { + Note note = notebook.createNote(defaultInterpreterGroup, context.getAutheInfo()); + note.addNewParagraph(context.getAutheInfo()); // it's an empty note. so add one paragraph + note.setName(noteName); + note.setCronSupported(notebook.getConf()); + note.persist(context.getAutheInfo()); + callback.onSuccess(note, context); + return note; + } catch (IOException e) { + callback.onFailure(new IOException("Fail to create Note", e), context); + return null; + } + } + + + public void removeNote(String noteId, + ServiceContext context, + ServiceCallback<String> callback) throws IOException { + if (!checkPermission(noteId, Permission.OWNER, Message.OP.DEL_NOTE, context, callback)) { + return; + } + if (notebook.getNote(noteId) != null) { + notebook.removeNote(noteId, context.getAutheInfo()); + callback.onSuccess("Delete note successfully", context); + } else { + callback.onFailure(new NoteNotFoundException(noteId), context); + } + } + + public List<Map<String, String>> listNotes(boolean needsReload, + ServiceContext context, + ServiceCallback<List<Map<String, String>>> callback) + throws IOException { + + ZeppelinConfiguration conf = notebook.getConf(); + String homeScreenNoteId = conf.getString(ZEPPELIN_NOTEBOOK_HOMESCREEN); + boolean hideHomeScreenNotebookFromList = + conf.getBoolean(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE); + if (needsReload) { + try { + notebook.reloadAllNotes(context.getAutheInfo()); + } catch (IOException e) { + LOGGER.error("Fail to reload notes from repository", e); + } + } + + List<Note> notes = notebook.getAllNotes(context.getUserAndRoles()); + List<Map<String, String>> notesInfo = new LinkedList<>(); + for (Note note : notes) { + Map<String, String> info = new HashMap<>(); + if (hideHomeScreenNotebookFromList && note.getId().equals(homeScreenNoteId)) { + continue; + } + info.put("id", note.getId()); + info.put("name", note.getName()); + notesInfo.add(info); + } + + callback.onSuccess(notesInfo, context); + return notesInfo; + } + + public void renameNote(String noteId, + String newNoteName, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.NOTE_RENAME, context, callback)) { + return; + } + + Note note = notebook.getNote(noteId); + if (note != null) { + note.setName(newNoteName); + note.setCronSupported(notebook.getConf()); + note.persist(context.getAutheInfo()); + callback.onSuccess(note, context); + } else { + callback.onFailure(new NoteNotFoundException(noteId), context); + } + + } + + public Note cloneNote(String noteId, + String newNoteName, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + Note newNote = notebook.cloneNote(noteId, newNoteName, context.getAutheInfo()); + callback.onSuccess(newNote, context); + return newNote; + } + + public Note importNote(String noteName, + String noteJson, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + Note note = notebook.importNote(noteJson, noteName, context.getAutheInfo()); + note.persist(context.getAutheInfo()); + callback.onSuccess(note, context); + return note; + } + + public boolean runParagraph(String noteId, + String paragraphId, + String title, + String text, + Map<String, Object> params, + Map<String, Object> config, + boolean isRunAll, + ServiceContext context, + ServiceCallback<Paragraph> callback) throws IOException { + + if (!checkPermission(noteId, Permission.RUNNER, Message.OP.RUN_PARAGRAPH, context, callback)) { + return false; + } + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return false; + } + Paragraph p = note.getParagraph(paragraphId); + if (p == null) { + callback.onFailure(new ParagraphNotFoundException(paragraphId), context); + return false; + } + if (!p.isEnabled()) { + if (!isRunAll) { + callback.onFailure(new IOException("paragraph is disabled."), context); + } + return false; + } + p.setText(text); + p.setTitle(title); + p.setAuthenticationInfo(context.getAutheInfo()); + p.settings.setParams(params); + p.setConfig(config); + + if (note.isPersonalizedMode()) { + p = note.getParagraph(paragraphId); + p.setText(text); + p.setTitle(title); + p.setAuthenticationInfo(context.getAutheInfo()); + p.settings.setParams(params); + p.setConfig(config); + } + + try { + note.persist(p.getAuthenticationInfo()); + boolean result = note.run(p.getId(), false); + callback.onSuccess(p, context); + return result; + } catch (Exception ex) { + LOGGER.error("Exception from run", ex); + p.setReturn(new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), ex); + p.setStatus(Job.Status.ERROR); + callback.onFailure(new Exception("Fail to run paragraph " + paragraphId, ex), context); + return false; + } + } + + public void runAllParagraphs(String noteId, + List<Map<String, Object>> paragraphs, + ServiceContext context, + ServiceCallback<Paragraph> callback) throws IOException { + if (!checkPermission(noteId, Permission.RUNNER, Message.OP.RUN_ALL_PARAGRAPHS, context, + callback)) { + return; + } + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + + for (Map<String, Object> raw : paragraphs) { + String paragraphId = (String) raw.get("id"); + if (paragraphId == null) { + continue; + } + String text = (String) raw.get("paragraph"); + String title = (String) raw.get("title"); + Map<String, Object> params = (Map<String, Object>) raw.get("params"); + Map<String, Object> config = (Map<String, Object>) raw.get("config"); + + if (runParagraph(noteId, paragraphId, title, text, params, config, true, context, callback)) { + // stop execution when one paragraph fails. + break; + } + } + } + + public void cancelParagraph(String noteId, + String paragraphId, + ServiceContext context, + ServiceCallback<Paragraph> callback) throws IOException { + if (!checkPermission(noteId, Permission.RUNNER, Message.OP.CANCEL_PARAGRAPH, context, + callback)) { + return; + } + Note note = notebook.getNote(noteId); + if (note == null) { + throw new NoteNotFoundException(noteId); + } + Paragraph p = note.getParagraph(paragraphId); + if (p == null) { + throw new ParagraphNotFoundException(paragraphId); + } + p.abort(); + callback.onSuccess(p, context); + } + + public void moveParagraph(String noteId, + String paragraphId, + int newIndex, + ServiceContext context, + ServiceCallback<Paragraph> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.MOVE_PARAGRAPH, context, + callback)) { + return; + } + Note note = notebook.getNote(noteId); + if (note == null) { + throw new NoteNotFoundException(noteId); + } + if (note.getParagraph(paragraphId) == null) { + throw new ParagraphNotFoundException(paragraphId); + } + if (newIndex >= note.getParagraphCount()) { + callback.onFailure(new BadRequestException("newIndex " + newIndex + " is out of bounds"), + context); + return; + } + note.moveParagraph(paragraphId, newIndex); + note.persist(context.getAutheInfo()); + callback.onSuccess(note.getParagraph(newIndex), context); + } + + public void removeParagraph(String noteId, + String paragraphId, + ServiceContext context, + ServiceCallback<Paragraph> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.PARAGRAPH_REMOVE, context, + callback)) { + return; + } + Note note = notebook.getNote(noteId); + if (note == null) { + throw new NoteNotFoundException(noteId); + } + if (note.getParagraph(paragraphId) == null) { + throw new ParagraphNotFoundException(paragraphId); + } + Paragraph p = note.removeParagraph(context.getAutheInfo().getUser(), paragraphId); + note.persist(context.getAutheInfo()); + callback.onSuccess(p, context); + } + + public Paragraph insertParagraph(String noteId, + int index, + Map<String, Object> config, + ServiceContext context, + ServiceCallback<Paragraph> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.INSERT_PARAGRAPH, context, + callback)) { + return null; + } + Note note = notebook.getNote(noteId); + if (note == null) { + throw new NoteNotFoundException(noteId); + } + Paragraph newPara = note.insertNewParagraph(index, context.getAutheInfo()); + newPara.setConfig(config); + note.persist(context.getAutheInfo()); + callback.onSuccess(newPara, context); + return newPara; + } + + public void restoreNote(String noteId, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.RESTORE_NOTE, context, + callback)) { + return; + } + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + //restore cron + Map<String, Object> config = note.getConfig(); + if (config.get("cron") != null) { + notebook.refreshCron(note.getId()); + } + + if (note.isTrash()) { + String newName = note.getName().replaceFirst(Folder.TRASH_FOLDER_ID + "/", ""); + renameNote(noteId, newName, context, callback); + } else { + callback.onFailure(new IOException(String.format("Trying to restore a note {} " + + "which is not in Trash", noteId)), context); + } + } + + public void updateParagraph(String noteId, + String paragraphId, + String title, + String text, + Map<String, Object> params, + Map<String, Object> config, + ServiceContext context, + ServiceCallback<Paragraph> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.COMMIT_PARAGRAPH, context, + callback)) { + return; + } + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + Paragraph p = note.getParagraph(paragraphId); + if (p == null) { + callback.onFailure(new ParagraphNotFoundException(paragraphId), context); + return; + } + + p.settings.setParams(params); + p.setConfig(config); + p.setTitle(title); + p.setText(text); + if (note.isPersonalizedMode()) { + p = p.getUserParagraph(context.getAutheInfo().getUser()); + p.settings.setParams(params); + p.setConfig(config); + p.setTitle(title); + p.setText(text); + } + note.persist(context.getAutheInfo()); + callback.onSuccess(p, context); + } + + public void clearParagraphOutput(String noteId, + String paragraphId, + ServiceContext context, + ServiceCallback<Paragraph> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.PARAGRAPH_CLEAR_OUTPUT, context, + callback)) { + return; + } + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + Paragraph p = note.getParagraph(paragraphId); + if (p == null) { + callback.onFailure(new ParagraphNotFoundException(paragraphId), context); + return; + } + Paragraph returnedParagraph = null; + if (note.isPersonalizedMode()) { + returnedParagraph = note.clearPersonalizedParagraphOutput(paragraphId, + context.getAutheInfo().getUser()); + } else { + note.clearParagraphOutput(paragraphId); + returnedParagraph = note.getParagraph(paragraphId); + } + callback.onSuccess(returnedParagraph, context); + } + + public void clearAllParagraphOutput(String noteId, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.PARAGRAPH_CLEAR_ALL_OUTPUT, context, + callback)) { + return; + } + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + + note.clearAllParagraphOutput(); + callback.onSuccess(note, context); + } + + + + public void updateNote(String noteId, + String name, + Map<String, Object> config, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.NOTE_UPDATE, context, + callback)) { + return; + } + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + + if (!(Boolean) note.getConfig().get("isZeppelinNotebookCronEnable")) { + if (config.get("cron") != null) { + config.remove("cron"); + } + } + boolean cronUpdated = isCronUpdated(config, note.getConfig()); + note.setName(name); + note.setConfig(config); + if (cronUpdated) { + notebook.refreshCron(note.getId()); + } + + note.persist(context.getAutheInfo()); + callback.onSuccess(note, context); + } + + + private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) { + boolean cronUpdated = false; + if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron") + .equals(configB.get("cron"))) { + cronUpdated = true; + } else if (configA.get("cron") == null && configB.get("cron") == null) { + cronUpdated = false; + } else if (configA.get("cron") != null || configB.get("cron") != null) { + cronUpdated = true; + } + + return cronUpdated; + } + + public void saveNoteForms(String noteId, + Map<String, Object> noteParams, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + if (!checkPermission(noteId, Permission.WRITER, Message.OP.SAVE_NOTE_FORMS, context, + callback)) { + return; + } + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + + note.setNoteParams(noteParams); + note.persist(context.getAutheInfo()); + callback.onSuccess(note, context); + } + + public void removeNoteForms(String noteId, + String formName, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + + if (!checkPermission(noteId, Permission.WRITER, Message.OP.REMOVE_NOTE_FORMS, context, + callback)) { + return; + } + + note.getNoteForms().remove(formName); + note.getNoteParams().remove(formName); + note.persist(context.getAutheInfo()); + callback.onSuccess(note, context); + } + + public NotebookRepoWithVersionControl.Revision checkpointNote( + String noteId, + String commitMessage, + ServiceContext context, + ServiceCallback<NotebookRepoWithVersionControl.Revision> callback) throws IOException { + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return null; + } + + if (!checkPermission(noteId, Permission.WRITER, Message.OP.REMOVE_NOTE_FORMS, context, + callback)) { + return null; + } + + NotebookRepoWithVersionControl.Revision revision = + notebook.checkpointNote(noteId, commitMessage, context.getAutheInfo()); + callback.onSuccess(revision, context); + return revision; + } + + public List<NotebookRepoWithVersionControl.Revision> listRevisionHistory( + String noteId, + ServiceContext context, + ServiceCallback<List<NotebookRepoWithVersionControl.Revision>> callback) throws IOException { + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return null; + } + + // TODO(zjffdu) Disable checking permission for now, otherwise zeppelin will send 2 AUTH_INFO + // message to frontend when frontend try to get note without proper privilege. + // if (!checkPermission(noteId, Permission.READER, Message.OP.LIST_REVISION_HISTORY, context, + // callback)) { + // return null; + // } + List<NotebookRepoWithVersionControl.Revision> revisions = + notebook.listRevisionHistory(noteId, context.getAutheInfo()); + callback.onSuccess(revisions, context); + return revisions; + } + + + public Note setNoteRevision(String noteId, + String revisionId, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return null; + } + + if (!checkPermission(noteId, Permission.WRITER, Message.OP.SET_NOTE_REVISION, context, + callback)) { + return null; + } + + try { + Note resultNote = notebook.setNoteRevision(noteId, revisionId, context.getAutheInfo()); + callback.onSuccess(resultNote, context); + return resultNote; + } catch (Exception e) { + callback.onFailure(new IOException("Fail to set given note revision", e), context); + return null; + } + } + + public void getNotebyRevision(String noteId, + String revisionId, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + + if (!checkPermission(noteId, Permission.READER, Message.OP.NOTE_REVISION, context, + callback)) { + return; + } + Note revisionNote = notebook.getNoteByRevision(noteId, revisionId, context.getAutheInfo()); + callback.onSuccess(revisionNote, context); + } + + public void getNoteByRevisionForCompare(String noteId, + String revisionId, + ServiceContext context, + ServiceCallback<Note> callback) throws IOException { + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return; + } + + if (!checkPermission(noteId, Permission.READER, Message.OP.NOTE_REVISION_FOR_COMPARE, context, + callback)) { + return; + } + Note revisionNote = null; + if (revisionId.equals("Head")) { + revisionNote = notebook.getNote(noteId); + } else { + revisionNote = notebook.getNoteByRevision(noteId, revisionId, context.getAutheInfo()); + } + callback.onSuccess(revisionNote, context); + } + + public List<InterpreterCompletion> completion( + String noteId, + String paragraphId, + String buffer, + int cursor, + ServiceContext context, + ServiceCallback<List<InterpreterCompletion>> callback) throws IOException { + + Note note = notebook.getNote(noteId); + if (note == null) { + callback.onFailure(new NoteNotFoundException(noteId), context); + return null; + } + + if (!checkPermission(noteId, Permission.WRITER, Message.OP.COMPLETION, context, + callback)) { + return null; + } + + try { + List<InterpreterCompletion> completions = note.completion(paragraphId, buffer, cursor); + callback.onSuccess(completions, context); + return completions; + } catch (RuntimeException e) { + callback.onFailure(new IOException("Fail to get completion", e), context); + return null; + } + } + + + + + enum Permission { + READER, + WRITER, + RUNNER, + OWNER, + } + + /** + * Return null when it is allowed, otherwise return the error message which could be + * propagated to frontend + * + * @param noteId + * @param context + * @param permission + * @param op + * @return + */ + private <T> boolean checkPermission(String noteId, + Permission permission, + Message.OP op, + ServiceContext context, + ServiceCallback<T> callback) throws IOException { + boolean isAllowed = false; + Set<String> allowed = null; + switch (permission) { + case READER: + isAllowed = notebookAuthorization.isReader(noteId, context.getUserAndRoles()); + allowed = notebookAuthorization.getReaders(noteId); + break; + case WRITER: + isAllowed = notebookAuthorization.isWriter(noteId, context.getUserAndRoles()); + allowed = notebookAuthorization.getWriters(noteId); + break; + case RUNNER: + isAllowed = notebookAuthorization.isRunner(noteId, context.getUserAndRoles()); + allowed = notebookAuthorization.getRunners(noteId); + break; + case OWNER: + isAllowed = notebookAuthorization.isOwner(noteId, context.getUserAndRoles()); + allowed = notebookAuthorization.getOwners(noteId); + break; + } + if (isAllowed) { + return true; + } else { + String errorMsg = "Insufficient privileges to " + permission + " note.\n" + + "Allowed users or roles: " + allowed + "\n" + "But the user " + + context.getAutheInfo().getUser() + " belongs to: " + context.getUserAndRoles(); + callback.onFailure(new ForbiddenException(errorMsg), context); + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceCallback.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceCallback.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceCallback.java new file mode 100644 index 0000000..fd5af9e --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceCallback.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.service; + +import java.io.IOException; + +/** + * This will be used by service classes as callback mechanism. + */ +public interface ServiceCallback<T> { + + /** + * Called when this service call is starting + * @param message + * @param context + * @throws IOException + */ + void onStart(String message, ServiceContext context) throws IOException; + + /** + * Called when this service call is succeed + * @param result + * @param context + * @throws IOException + */ + void onSuccess(T result, ServiceContext context) throws IOException; + + /** + * Called when this service call is failed + * @param ex + * @param context + * @throws IOException + */ + void onFailure(Exception ex, ServiceContext context) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceContext.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceContext.java new file mode 100644 index 0000000..3db8bf8 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/ServiceContext.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.service; + +import org.apache.zeppelin.user.AuthenticationInfo; + +import java.util.Set; + +/** + * Context info for Service call + */ +public class ServiceContext { + + private AuthenticationInfo autheInfo; + private Set<String> userAndRoles; + + public ServiceContext(AuthenticationInfo authInfo, Set<String> userAndRoles) { + this.autheInfo = authInfo; + this.userAndRoles = userAndRoles; + } + + public AuthenticationInfo getAutheInfo() { + return autheInfo; + } + + public Set<String> getUserAndRoles() { + return userAndRoles; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6beb1bb3/zeppelin-server/src/main/java/org/apache/zeppelin/service/SimpleServiceCallback.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/SimpleServiceCallback.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/SimpleServiceCallback.java new file mode 100644 index 0000000..6957707 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/SimpleServiceCallback.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * + * @param <T> + */ +public class SimpleServiceCallback<T> implements ServiceCallback<T> { + + private static Logger LOGGER = LoggerFactory.getLogger(SimpleServiceCallback.class); + + @Override + public void onStart(String message, ServiceContext context) throws IOException { + LOGGER.debug(message); + } + + @Override + public void onSuccess(T result, ServiceContext context) throws IOException { + LOGGER.debug("OP is succeeded"); + } + + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + LOGGER.warn(ex.getMessage()); + } + +}