http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java deleted file mode 100644 index fd9960b..0000000 --- a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java +++ /dev/null @@ -1,483 +0,0 @@ -package com.nflabs.zeppelin.socket; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.nflabs.zeppelin.interpreter.InterpreterResult; -import org.java_websocket.WebSocket; -import org.java_websocket.handshake.ClientHandshake; -import org.java_websocket.server.WebSocketServer; -import org.quartz.SchedulerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Strings; -import com.google.gson.Gson; -import com.nflabs.zeppelin.notebook.JobListenerFactory; -import com.nflabs.zeppelin.notebook.Note; -import com.nflabs.zeppelin.notebook.Notebook; -import com.nflabs.zeppelin.notebook.Paragraph; -import com.nflabs.zeppelin.scheduler.Job; -import com.nflabs.zeppelin.scheduler.Job.Status; -import com.nflabs.zeppelin.scheduler.JobListener; -import com.nflabs.zeppelin.server.ZeppelinServer; -import com.nflabs.zeppelin.socket.Message.OP; - -/** - * Zeppelin websocket service. - * - * @author anthonycorbacho - */ -public class NotebookServer extends WebSocketServer implements JobListenerFactory { - - private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); - private static final int DEFAULT_PORT = 8282; - - private static void creatingwebSocketServerLog(int port) { - LOG.info("Create zeppelin websocket on port {}", port); - } - - Gson gson = new Gson(); - Map<String, List<WebSocket>> noteSocketMap = new HashMap<String, List<WebSocket>>(); - List<WebSocket> connectedSockets = new LinkedList<WebSocket>(); - - public NotebookServer() { - super(new InetSocketAddress(DEFAULT_PORT)); - creatingwebSocketServerLog(DEFAULT_PORT); - } - - public NotebookServer(int port) { - super(new InetSocketAddress(port)); - creatingwebSocketServerLog(port); - } - - private Notebook notebook() { - return ZeppelinServer.notebook; - } - - @Override - public void onOpen(WebSocket conn, ClientHandshake handshake) { - LOG.info("New connection from {} : {}", conn.getRemoteSocketAddress().getHostName(), conn - .getRemoteSocketAddress().getPort()); - synchronized (connectedSockets) { - connectedSockets.add(conn); - } - } - - @Override - public void onMessage(WebSocket conn, String msg) { - Notebook notebook = notebook(); - try { - Message messagereceived = deserializeMessage(msg); - LOG.info("RECEIVE << " + messagereceived.op); - /** Lets be elegant here */ - switch (messagereceived.op) { - case LIST_NOTES: - broadcastNoteList(); - break; - case GET_NOTE: - sendNote(conn, notebook, messagereceived); - break; - case NEW_NOTE: - createNote(conn, notebook); - break; - case DEL_NOTE: - removeNote(conn, notebook, messagereceived); - break; - case COMMIT_PARAGRAPH: - updateParagraph(conn, notebook, messagereceived); - break; - case RUN_PARAGRAPH: - runParagraph(conn, notebook, messagereceived); - break; - case CANCEL_PARAGRAPH: - cancelParagraph(conn, notebook, messagereceived); - break; - case MOVE_PARAGRAPH: - moveParagraph(conn, notebook, messagereceived); - break; - case INSERT_PARAGRAPH: - insertParagraph(conn, notebook, messagereceived); - break; - case PARAGRAPH_REMOVE: - removeParagraph(conn, notebook, messagereceived); - break; - case NOTE_UPDATE: - updateNote(conn, notebook, messagereceived); - break; - case COMPLETION: - completion(conn, notebook, messagereceived); - break; - default: - broadcastNoteList(); - break; - } - } catch (Exception e) { - LOG.error("Can't handle message", e); - } - } - - @Override - public void onClose(WebSocket conn, int code, String reason, boolean remote) { - LOG.info("Closed connection to {} : {}", conn.getRemoteSocketAddress().getHostName(), conn - .getRemoteSocketAddress().getPort()); - removeConnectionFromAllNote(conn); - synchronized (connectedSockets) { - connectedSockets.remove(conn); - } - } - - @Override - public void onError(WebSocket conn, Exception message) { - removeConnectionFromAllNote(conn); - synchronized (connectedSockets) { - connectedSockets.remove(conn); - } - } - - private Message deserializeMessage(String msg) { - Message m = gson.fromJson(msg, Message.class); - return m; - } - - private String serializeMessage(Message m) { - return gson.toJson(m); - } - - private void addConnectionToNote(String noteId, WebSocket socket) { - synchronized (noteSocketMap) { - removeConnectionFromAllNote(socket); // make sure a socket relates only a single note. - List<WebSocket> socketList = noteSocketMap.get(noteId); - if (socketList == null) { - socketList = new LinkedList<WebSocket>(); - noteSocketMap.put(noteId, socketList); - } - - if (socketList.contains(socket) == false) { - socketList.add(socket); - } - } - } - - private void removeConnectionFromNote(String noteId, WebSocket socket) { - synchronized (noteSocketMap) { - List<WebSocket> socketList = noteSocketMap.get(noteId); - if (socketList != null) { - socketList.remove(socket); - } - } - } - - private void removeNote(String noteId) { - synchronized (noteSocketMap) { - List<WebSocket> socketList = noteSocketMap.remove(noteId); - } - } - - private void removeConnectionFromAllNote(WebSocket socket) { - synchronized (noteSocketMap) { - Set<String> keys = noteSocketMap.keySet(); - for (String noteId : keys) { - removeConnectionFromNote(noteId, socket); - } - } - } - - private String getOpenNoteId(WebSocket socket) { - String id = null; - synchronized (noteSocketMap) { - Set<String> keys = noteSocketMap.keySet(); - for (String noteId : keys) { - List<WebSocket> sockets = noteSocketMap.get(noteId); - if (sockets.contains(socket)) { - id = noteId; - } - } - } - return id; - } - - private void broadcast(String noteId, Message m) { - LOG.info("SEND >> " + m.op); - synchronized (noteSocketMap) { - List<WebSocket> socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.size() == 0) { - return; - } - for (WebSocket conn : socketLists) { - conn.send(serializeMessage(m)); - } - } - } - - private void broadcastAll(Message m) { - synchronized (connectedSockets) { - for (WebSocket conn : connectedSockets) { - conn.send(serializeMessage(m)); - } - } - } - - private void broadcastNote(Note note) { - broadcast(note.id(), new Message(OP.NOTE).put("note", note)); - } - - private void broadcastNoteList() { - Notebook notebook = notebook(); - List<Note> notes = notebook.getAllNotes(); - List<Map<String, String>> notesInfo = new LinkedList<Map<String, String>>(); - for (Note note : notes) { - Map<String, String> info = new HashMap<String, String>(); - info.put("id", note.id()); - info.put("name", note.getName()); - notesInfo.add(info); - } - broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); - } - - private void sendNote(WebSocket conn, Notebook notebook, Message fromMessage) { - String noteId = (String) fromMessage.get("id"); - if (noteId == null) { - return; - } - Note note = notebook.getNote(noteId); - if (note != null) { - addConnectionToNote(note.id(), conn); - conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); - } - } - - private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) - throws SchedulerException, IOException { - String noteId = (String) fromMessage.get("id"); - String name = (String) fromMessage.get("name"); - Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); - if (noteId == null) { - return; - } - if (config == null) { - return; - } - Note note = notebook.getNote(noteId); - if (note != null) { - boolean cronUpdated = isCronUpdated(config, note.getConfig()); - note.setName(name); - note.setConfig(config); - - if (cronUpdated) { - notebook.refreshCron(note.id()); - } - note.persist(); - - broadcastNote(note); - broadcastNoteList(); - } - } - - private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) { - boolean cronUpdated = false; - if (configA.get("cron") != null && configB.get("cron") != null - && configA.get("cron").equals(configB.get("cron"))) { - cronUpdated = true; - } else if (configA.get("cron") == null && configB.get("cron") == null) { - cronUpdated = false; - } else if (configA.get("cron") != null || configB.get("cron") != null) { - cronUpdated = true; - } - return cronUpdated; - } - - private void createNote(WebSocket conn, Notebook notebook) throws IOException { - Note note = notebook.createNote(); - note.addParagraph(); // it's an empty note. so add one paragraph - note.persist(); - broadcastNote(note); - broadcastNoteList(); - } - - private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - String noteId = (String) fromMessage.get("id"); - if (noteId == null) { - return; - } - Note note = notebook.getNote(noteId); - note.unpersist(); - notebook.removeNote(noteId); - removeNote(noteId); - broadcastNoteList(); - } - - private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - String paragraphId = (String) fromMessage.get("id"); - if (paragraphId == null) { - return; - } - Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); - Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); - final Note note = notebook.getNote(getOpenNoteId(conn)); - Paragraph p = note.getParagraph(paragraphId); - p.settings.setParams(params); - p.setConfig(config); - p.setTitle((String) fromMessage.get("title")); - p.setText((String) fromMessage.get("paragraph")); - note.persist(); - broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); - } - - private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - final String paragraphId = (String) fromMessage.get("id"); - if (paragraphId == null) { - return; - } - final Note note = notebook.getNote(getOpenNoteId(conn)); - /** We dont want to remove the last paragraph */ - if (!note.isLastParagraph(paragraphId)) { - note.removeParagraph(paragraphId); - note.persist(); - broadcastNote(note); - } - } - - private void completion(WebSocket conn, Notebook notebook, Message fromMessage) { - String paragraphId = (String) fromMessage.get("id"); - String buffer = (String) fromMessage.get("buf"); - int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); - Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); - - if (paragraphId == null) { - conn.send(serializeMessage(resp)); - return; - } - - final Note note = notebook.getNote(getOpenNoteId(conn)); - List<String> candidates = note.completion(paragraphId, buffer, cursor); - resp.put("completions", candidates); - conn.send(serializeMessage(resp)); - } - - private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - final String paragraphId = (String) fromMessage.get("id"); - if (paragraphId == null) { - return; - } - - final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); - final Note note = notebook.getNote(getOpenNoteId(conn)); - note.moveParagraph(paragraphId, newIndex); - note.persist(); - broadcastNote(note); - } - - private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); - - final Note note = notebook.getNote(getOpenNoteId(conn)); - note.insertParagraph(index); - note.persist(); - broadcastNote(note); - } - - - private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - final String paragraphId = (String) fromMessage.get("id"); - if (paragraphId == null) { - return; - } - - final Note note = notebook.getNote(getOpenNoteId(conn)); - Paragraph p = note.getParagraph(paragraphId); - p.abort(); - } - - private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - final String paragraphId = (String) fromMessage.get("id"); - if (paragraphId == null) { - return; - } - final Note note = notebook.getNote(getOpenNoteId(conn)); - Paragraph p = note.getParagraph(paragraphId); - String text = (String) fromMessage.get("paragraph"); - p.setText(text); - p.setTitle((String) fromMessage.get("title")); - Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); - p.settings.setParams(params); - Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); - p.setConfig(config); - - // if it's the last paragraph, let's add a new one - boolean isTheLastParagraph = note.getLastParagraph().getId().equals(p.getId()); - if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) { - note.addParagraph(); - } - note.persist(); - broadcastNote(note); - - try { - note.run(paragraphId); - } - catch (Exception ex) { - LOG.error("Exception from run", ex); - if (p != null) { - p.setReturn(new InterpreterResult( - InterpreterResult.Code.ERROR, ex.getMessage()), ex); - p.setStatus(Status.ERROR); - } - } - } - - /** - * Need description here. - * - */ - public static class ParagraphJobListener implements JobListener { - private NotebookServer notebookServer; - private Note note; - - public ParagraphJobListener(NotebookServer notebookServer, Note note) { - this.notebookServer = notebookServer; - this.note = note; - } - - @Override - public void onProgressUpdate(Job job, int progress) { - notebookServer.broadcast(note.id(), - new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress())); - } - - @Override - public void beforeStatusChange(Job job, Status before, Status after) {} - - @Override - public void afterStatusChange(Job job, Status before, Status after) { - if (after == Status.ERROR) { - job.getException().printStackTrace(); - } - if (job.isTerminated()) { - LOG.info("Job {} is finished", job.getId()); - try { - note.persist(); - } catch (IOException e) { - e.printStackTrace(); - } - } - notebookServer.broadcastNote(note); - } - } - - @Override - public JobListener getParagraphJobListener(Note note) { - return new ParagraphJobListener(this, note); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/SslWebSocketServerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/SslWebSocketServerFactory.java b/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/SslWebSocketServerFactory.java deleted file mode 100644 index b84e1d0..0000000 --- a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/SslWebSocketServerFactory.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.nflabs.zeppelin.socket; - -import org.java_websocket.server.DefaultSSLWebSocketServerFactory; -import org.java_websocket.SSLSocketChannel2; - -import java.io.IOException; - -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; - -import java.util.concurrent.ExecutorService; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -/** - * Extension of the java_websocket library's DefaultSslWebSocketServerFactory - * to require client side authentication during the SSL handshake - */ -public class SslWebSocketServerFactory - extends DefaultSSLWebSocketServerFactory { - - protected boolean needClientAuth; - - public SslWebSocketServerFactory(SSLContext sslcontext) { - super(sslcontext); - initAttributes(); - } - - public SslWebSocketServerFactory( - SSLContext sslcontext, - ExecutorService exec) { - - super(sslcontext, exec); - initAttributes(); - } - - protected void initAttributes() { - this.needClientAuth = false; - } - - @Override - public ByteChannel wrapChannel(SocketChannel channel, SelectionKey key) - throws IOException { - - SSLEngine sslEngine = sslcontext.createSSLEngine(); - sslEngine.setUseClientMode(false); - sslEngine.setNeedClientAuth(needClientAuth); - return new SSLSocketChannel2( channel, sslEngine, exec, key ); - } - - public boolean getNeedClientAuth() { - return needClientAuth; - } - - public void setNeedClientAuth(boolean needClientAuth) { - this.needClientAuth = needClientAuth; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java new file mode 100644 index 0000000..1e2ade6 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterFactory; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest; +import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest; +import org.apache.zeppelin.server.JsonResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; + +/** + * Interpreter Rest API + * + */ +@Path("/interpreter") +@Produces("application/json") +@Api(value = "/interpreter", description = "Zeppelin Interpreter REST API") +public class InterpreterRestApi { + Logger logger = LoggerFactory.getLogger(InterpreterRestApi.class); + + private InterpreterFactory interpreterFactory; + + Gson gson = new Gson(); + + public InterpreterRestApi() { + + } + + public InterpreterRestApi(InterpreterFactory interpreterFactory) { + this.interpreterFactory = interpreterFactory; + } + + /** + * List all interpreter settings + * @return + */ + @GET + @Path("setting") + @ApiOperation(httpMethod = "GET", value = "List all interpreter setting") + @ApiResponses(value = {@ApiResponse(code = 500, message = "When something goes wrong")}) + public Response listSettings() { + List<InterpreterSetting> interpreterSettings = null; + interpreterSettings = interpreterFactory.get(); + return new JsonResponse(Status.OK, "", interpreterSettings).build(); + } + + /** + * Add new interpreter setting + * @param message + * @return + * @throws IOException + * @throws InterpreterException + */ + @POST + @Path("setting") + @ApiOperation(httpMethod = "GET", value = "Create new interpreter setting") + @ApiResponses(value = {@ApiResponse(code = 201, message = "On success")}) + public Response newSettings(String message) throws InterpreterException, IOException { + NewInterpreterSettingRequest request = gson.fromJson(message, + NewInterpreterSettingRequest.class); + Properties p = new Properties(); + p.putAll(request.getProperties()); + interpreterFactory.add(request.getName(), request.getGroup(), request.getOption(), p); + return new JsonResponse(Status.CREATED, "").build(); + } + + @PUT + @Path("setting/{settingId}") + public Response updateSetting(String message, @PathParam("settingId") String settingId) { + logger.info("Update interpreterSetting {}", settingId); + + try { + UpdateInterpreterSettingRequest p = gson.fromJson(message, + UpdateInterpreterSettingRequest.class); + interpreterFactory.setPropertyAndRestart(settingId, p.getOption(), p.getProperties()); + } catch (InterpreterException e) { + return new JsonResponse(Status.NOT_FOUND, e.getMessage(), e).build(); + } catch (IOException e) { + return new JsonResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage(), e).build(); + } + InterpreterSetting setting = interpreterFactory.get(settingId); + if (setting == null) { + return new JsonResponse(Status.NOT_FOUND, "", settingId).build(); + } + return new JsonResponse(Status.OK, "", setting).build(); + } + + @DELETE + @Path("setting/{settingId}") + @ApiOperation(httpMethod = "GET", value = "Remove interpreter setting") + @ApiResponses(value = {@ApiResponse(code = 500, message = "When something goes wrong")}) + public Response removeSetting(@PathParam("settingId") String settingId) throws IOException { + logger.info("Remove interpreterSetting {}", settingId); + interpreterFactory.remove(settingId); + return new JsonResponse(Status.OK).build(); + } + + @PUT + @Path("setting/restart/{settingId}") + @ApiOperation(httpMethod = "GET", value = "restart interpreter setting") + @ApiResponses(value = { + @ApiResponse(code = 404, message = "Not found")}) + public Response restartSetting(@PathParam("settingId") String settingId) { + logger.info("Restart interpreterSetting {}", settingId); + try { + interpreterFactory.restart(settingId); + } catch (InterpreterException e) { + return new JsonResponse(Status.NOT_FOUND, e.getMessage(), e).build(); + } + InterpreterSetting setting = interpreterFactory.get(settingId); + if (setting == null) { + return new JsonResponse(Status.NOT_FOUND, "", settingId).build(); + } + return new JsonResponse(Status.OK, "", setting).build(); + } + + /** + * List all available interpreters by group + */ + @GET + @ApiOperation(httpMethod = "GET", value = "List all available interpreters") + @ApiResponses(value = { + @ApiResponse(code = 500, message = "When something goes wrong")}) + public Response listInterpreter(String message) { + Map<String, RegisteredInterpreter> m = Interpreter.registeredInterpreters; + return new JsonResponse(Status.OK, "", m).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookResponse.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookResponse.java new file mode 100644 index 0000000..1397ac1 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookResponse.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Response wrapper. + * + * @author anthonycorbacho + * + */ +@XmlRootElement +public class NotebookResponse { + private String msg; + + public NotebookResponse() {} + + public NotebookResponse(String msg) { + this.msg = msg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java new file mode 100644 index 0000000..8a933f7 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.rest.message.InterpreterSettingListForNoteBind; +import org.apache.zeppelin.server.JsonResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +/** + * Rest api endpoint for the noteBook. + */ +@Path("/notebook") +@Produces("application/json") +public class NotebookRestApi { + Logger logger = LoggerFactory.getLogger(NotebookRestApi.class); + Gson gson = new Gson(); + private Notebook notebook; + + public NotebookRestApi() {} + + public NotebookRestApi(Notebook notebook) { + this.notebook = notebook; + } + + /** + * bind a setting to note + * @throws IOException + */ + @PUT + @Path("interpreter/bind/{noteId}") + public Response bind(@PathParam("noteId") String noteId, String req) throws IOException { + List<String> settingIdList = gson.fromJson(req, new TypeToken<List<String>>(){}.getType()); + notebook.bindInterpretersToNote(noteId, settingIdList); + return new JsonResponse(Status.OK).build(); + } + + /** + * list binded setting + */ + @GET + @Path("interpreter/bind/{noteId}") + public Response bind(@PathParam("noteId") String noteId) { + List<InterpreterSettingListForNoteBind> settingList + = new LinkedList<InterpreterSettingListForNoteBind>(); + + List<InterpreterSetting> selectedSettings = notebook.getBindedInterpreterSettings(noteId); + for (InterpreterSetting setting : selectedSettings) { + settingList.add(new InterpreterSettingListForNoteBind( + setting.id(), + setting.getName(), + setting.getGroup(), + setting.getInterpreterGroup(), + true) + ); + } + + List<InterpreterSetting> availableSettings = notebook.getInterpreterFactory().get(); + for (InterpreterSetting setting : availableSettings) { + boolean selected = false; + for (InterpreterSetting selectedSetting : selectedSettings) { + if (selectedSetting.id().equals(setting.id())) { + selected = true; + break; + } + } + + if (!selected) { + settingList.add(new InterpreterSettingListForNoteBind( + setting.id(), + setting.getName(), + setting.getGroup(), + setting.getInterpreterGroup(), + false) + ); + } + } + return new JsonResponse(Status.OK, "", settingList).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ZeppelinRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ZeppelinRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ZeppelinRestApi.java new file mode 100644 index 0000000..4fc47a4 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ZeppelinRestApi.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; + +import com.wordnik.swagger.annotations.Api; + +/** + * Zeppelin root rest api endpoint. + * + * @author anthonycorbacho + * @since 0.3.4 + */ +@Path("/") +@Api(value = "/", description = "Zeppelin REST API root") +public class ZeppelinRestApi { + + /** + * Required by Swagger. + */ + public ZeppelinRestApi() { + super(); + } + + /** + * Get the root endpoint Return always 200. + * + * @return 200 response + */ + @GET + public Response getRoot() { + return Response.ok().build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java new file mode 100644 index 0000000..b74054c --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.message; + +import java.util.List; + +import org.apache.zeppelin.interpreter.Interpreter; + +/** + * InterpreterSetting information for binding + */ +public class InterpreterSettingListForNoteBind { + String id; + String name; + String group; + private boolean selected; + private List<Interpreter> interpreters; + + public InterpreterSettingListForNoteBind(String id, String name, + String group, List<Interpreter> interpreters, boolean selected) { + super(); + this.id = id; + this.name = name; + this.group = group; + this.interpreters = interpreters; + this.selected = selected; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public List<Interpreter> getInterpreterNames() { + return interpreters; + } + + public void setInterpreterNames(List<Interpreter> interpreters) { + this.interpreters = interpreters; + } + + public boolean isSelected() { + return selected; + } + + public void setSelected(boolean selected) { + this.selected = selected; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java new file mode 100644 index 0000000..6489a71 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.message; + +import java.util.Map; + +import org.apache.zeppelin.interpreter.InterpreterOption; + +/** + * NewInterpreterSetting rest api request message + * + */ +public class NewInterpreterSettingRequest { + String name; + String group; + InterpreterOption option; + Map<String, String> properties; + + public NewInterpreterSettingRequest() { + + } + + public String getName() { + return name; + } + + public String getGroup() { + return group; + } + + public Map<String, String> getProperties() { + return properties; + } + + public InterpreterOption getOption() { + return option; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java new file mode 100644 index 0000000..98f4ab7 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.rest.message; + +import java.util.Properties; + +import org.apache.zeppelin.interpreter.InterpreterOption; + +/** + * + */ +public class UpdateInterpreterSettingRequest { + InterpreterOption option; + Properties properties; + + public UpdateInterpreterSettingRequest(InterpreterOption option, + Properties properties) { + super(); + this.option = option; + this.properties = properties; + } + public InterpreterOption getOption() { + return option; + } + public Properties getProperties() { + return properties; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java new file mode 100644 index 0000000..8c8f9a7 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.server; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.util.resource.Resource; + +/** + * Simple servlet to dynamically set the Websocket port + * in the JavaScript sent to the client + */ +public class AppScriptServlet extends DefaultServlet { + + // Hash containing the possible scripts that contain the getPort() + // function originally defined in app.js + private static Set<String> scriptPaths = new HashSet<String>( + Arrays.asList( + "/scripts/scripts.js", + "/scripts/app.js" + ) + ); + + private int websocketPort; + + public AppScriptServlet(int websocketPort) { + this.websocketPort = websocketPort; + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, + IOException { + + // Process all requests not for the app script to the parent + // class + String uri = request.getRequestURI(); + if (!scriptPaths.contains(uri)) { + super.doGet(request, response); + return; + } + + // Read the script file chunk by chunk + Resource scriptFile = getResource(uri); + InputStream is = scriptFile.getInputStream(); + StringBuffer script = new StringBuffer(); + byte[] buffer = new byte[1024]; + while (is.available() > 0) { + int numRead = is.read(buffer); + if (numRead <= 0) { + break; + } + script.append(new String(buffer, 0, numRead, "UTF-8")); + } + + // Replace the string "function getPort(){...}" to return + // the proper value + int startIndex = script.indexOf("function getPort()"); + int endIndex = script.indexOf("}", startIndex); + + if (startIndex >= 0 && endIndex >= 0) { + String replaceString = "function getPort(){return " + websocketPort + "}"; + script.replace(startIndex, endIndex + 1, replaceString); + } + + response.getWriter().println(script.toString()); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/server/CorsFilter.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/CorsFilter.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/CorsFilter.java new file mode 100644 index 0000000..1524d5b --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/CorsFilter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.server; + +import java.io.IOException; +import java.text.DateFormat; +import java.util.Date; +import java.util.Locale; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * Cors filter + * + */ +public class CorsFilter implements Filter { + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) + throws IOException, ServletException { + if (((HttpServletRequest) request).getMethod().equals("OPTIONS")) { + HttpServletResponse resp = ((HttpServletResponse) response); + addCorsHeaders(resp); + return; + } + + if (response instanceof HttpServletResponse) { + HttpServletResponse alteredResponse = ((HttpServletResponse) response); + addCorsHeaders(alteredResponse); + } + filterChain.doFilter(request, response); + } + + private void addCorsHeaders(HttpServletResponse response) { + response.addHeader("Access-Control-Allow-Origin", "*"); + response.addHeader("Access-Control-Allow-Credentials", "true"); + response.addHeader("Access-Control-Allow-Headers", "authorization,Content-Type"); + response.addHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, HEAD, DELETE"); + DateFormat fullDateFormatEN = + DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL, new Locale("EN", "en")); + response.addHeader("Date", fullDateFormatEN.format(new Date())); + } + + @Override + public void destroy() {} + + @Override + public void init(FilterConfig filterConfig) throws ServletException {} +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java new file mode 100644 index 0000000..28a3bb8 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.server; + +import java.util.ArrayList; + +import javax.ws.rs.core.NewCookie; +import javax.ws.rs.core.Response.ResponseBuilder; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterSerializer; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * Json response builder. + * + * @author Leemoonsoo + * + * @param <T> + */ +public class JsonResponse<T> { + private javax.ws.rs.core.Response.Status status; + private String message; + private T body; + transient ArrayList<NewCookie> cookies; + transient boolean pretty = false; + + public JsonResponse(javax.ws.rs.core.Response.Status status) { + this.status = status; + this.message = null; + this.body = null; + + } + + public JsonResponse(javax.ws.rs.core.Response.Status status, String message) { + this.status = status; + this.message = message; + this.body = null; + } + + public JsonResponse(javax.ws.rs.core.Response.Status status, T body) { + this.status = status; + this.message = null; + this.body = body; + } + + public JsonResponse(javax.ws.rs.core.Response.Status status, String message, T body) { + this.status = status; + this.message = message; + this.body = body; + } + + public JsonResponse<T> setPretty(boolean pretty) { + this.pretty = pretty; + return this; + } + + /** + * Add cookie for building. + * + * @param newCookie + * @return + */ + public JsonResponse<T> addCookie(NewCookie newCookie) { + if (cookies == null) { + cookies = new ArrayList<NewCookie>(); + } + cookies.add(newCookie); + + return this; + } + + /** + * Add cookie for building. + * + * @param name + * @param value + * @return + */ + public JsonResponse<?> addCookie(String name, String value) { + return addCookie(new NewCookie(name, value)); + } + + @Override + public String toString() { + GsonBuilder gsonBuilder = new GsonBuilder() + .registerTypeAdapter(Interpreter.class, new InterpreterSerializer()); + if (pretty) { + gsonBuilder.setPrettyPrinting(); + } + Gson gson = gsonBuilder.create(); + return gson.toJson(this); + } + + public javax.ws.rs.core.Response.Status getCode() { + return status; + } + + public void setCode(javax.ws.rs.core.Response.Status status) { + this.status = status; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public T getBody() { + return body; + } + + public void setBody(T body) { + this.body = body; + } + + public javax.ws.rs.core.Response build() { + ResponseBuilder r = javax.ws.rs.core.Response.status(status).entity(this.toString()); + if (cookies != null) { + for (NewCookie nc : cookies) { + r.cookie(nc); + } + } + return r.build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java new file mode 100644 index 0000000..1c9aa03 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.server; + +import java.io.File; +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Set; + +import javax.net.ssl.SSLContext; +import javax.servlet.DispatcherType; +import javax.ws.rs.core.Application; + +import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.interpreter.InterpreterFactory; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.rest.InterpreterRestApi; +import org.apache.zeppelin.rest.NotebookRestApi; +import org.apache.zeppelin.rest.ZeppelinRestApi; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.apache.zeppelin.socket.NotebookServer; +import org.apache.zeppelin.socket.SslWebSocketServerFactory; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.bio.SocketConnector; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.session.SessionHandler; +import org.eclipse.jetty.server.ssl.SslSocketConnector; +import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.webapp.WebAppContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.wordnik.swagger.jersey.config.JerseyJaxrsConfig; + +/** + * Main class of Zeppelin. + * + * @author Leemoonsoo + * + */ + +public class ZeppelinServer extends Application { + private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class); + + private SchedulerFactory schedulerFactory; + public static Notebook notebook; + + static NotebookServer notebookServer; + + private InterpreterFactory replFactory; + + public static void main(String[] args) throws Exception { + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + conf.setProperty("args", args); + + final Server jettyServer = setupJettyServer(conf); + notebookServer = setupNotebookServer(conf); + + // REST api + final ServletContextHandler restApi = setupRestApiContextHandler(); + /** NOTE: Swagger-core is included via the web.xml in zeppelin-web + * But the rest of swagger is configured here + */ + final ServletContextHandler swagger = setupSwaggerContextHandler(conf); + + // Web UI + final WebAppContext webApp = setupWebAppContext(conf); + //Below is commented since zeppelin-docs module is removed. + //final WebAppContext webAppSwagg = setupWebAppSwagger(conf); + + // add all handlers + ContextHandlerCollection contexts = new ContextHandlerCollection(); + //contexts.setHandlers(new Handler[]{swagger, restApi, webApp, webAppSwagg}); + contexts.setHandlers(new Handler[]{swagger, restApi, webApp}); + jettyServer.setHandler(contexts); + + notebookServer.start(); + LOG.info("Start zeppelin server"); + jettyServer.start(); + LOG.info("Started"); + + Runtime.getRuntime().addShutdownHook(new Thread(){ + @Override public void run() { + LOG.info("Shutting down Zeppelin Server ... "); + try { + notebook.getInterpreterFactory().close(); + + jettyServer.stop(); + notebookServer.stop(); + } catch (Exception e) { + LOG.error("Error while stopping servlet container", e); + } + LOG.info("Bye"); + } + }); + + + // when zeppelin is started inside of ide (especially for eclipse) + // for graceful shutdown, input any key in console window + if (System.getenv("ZEPPELIN_IDENT_STRING") == null) { + try { + System.in.read(); + } catch (IOException e) { + } + System.exit(0); + } + + jettyServer.join(); + } + + private static Server setupJettyServer(ZeppelinConfiguration conf) + throws Exception { + + SocketConnector connector; + if (conf.useSsl()) { + connector = new SslSocketConnector(getSslContextFactory(conf)); + } + else { + connector = new SocketConnector(); + } + + // Set some timeout options to make debugging easier. + int timeout = 1000 * 30; + connector.setMaxIdleTime(timeout); + connector.setSoLingerTime(-1); + connector.setPort(conf.getServerPort()); + + final Server server = new Server(); + server.addConnector(connector); + + return server; + } + + private static NotebookServer setupNotebookServer(ZeppelinConfiguration conf) + throws Exception { + + NotebookServer server = new NotebookServer(conf.getWebSocketPort()); + + // Default WebSocketServer uses unencrypted connector, so only need to + // change the connector if SSL should be used. + if (conf.useSsl()) { + SslWebSocketServerFactory wsf = new SslWebSocketServerFactory(getSslContext(conf)); + wsf.setNeedClientAuth(conf.useClientAuth()); + server.setWebSocketFactory(wsf); + } + + return server; + } + + private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) + throws Exception { + + // Note that the API for the SslContextFactory is different for + // Jetty version 9 + SslContextFactory sslContextFactory = new SslContextFactory(); + + // Set keystore + sslContextFactory.setKeyStore(conf.getKeyStorePath()); + sslContextFactory.setKeyStoreType(conf.getKeyStoreType()); + sslContextFactory.setKeyStorePassword(conf.getKeyStorePassword()); + sslContextFactory.setKeyManagerPassword(conf.getKeyManagerPassword()); + + // Set truststore + sslContextFactory.setTrustStore(conf.getTrustStorePath()); + sslContextFactory.setTrustStoreType(conf.getTrustStoreType()); + sslContextFactory.setTrustStorePassword(conf.getTrustStorePassword()); + + sslContextFactory.setNeedClientAuth(conf.useClientAuth()); + + return sslContextFactory; + } + + private static SSLContext getSslContext(ZeppelinConfiguration conf) + throws Exception { + + SslContextFactory scf = getSslContextFactory(conf); + if (!scf.isStarted()) { + scf.start(); + } + return scf.getSslContext(); + } + + private static ServletContextHandler setupRestApiContextHandler() { + final ServletHolder cxfServletHolder = new ServletHolder(new CXFNonSpringJaxrsServlet()); + cxfServletHolder.setInitParameter("javax.ws.rs.Application", ZeppelinServer.class.getName()); + cxfServletHolder.setName("rest"); + cxfServletHolder.setForcedPath("rest"); + + final ServletContextHandler cxfContext = new ServletContextHandler(); + cxfContext.setSessionHandler(new SessionHandler()); + cxfContext.setContextPath("/api"); + cxfContext.addServlet(cxfServletHolder, "/*"); + + cxfContext.addFilter(new FilterHolder(CorsFilter.class), "/*", + EnumSet.allOf(DispatcherType.class)); + return cxfContext; + } + + /** + * Swagger core handler - Needed for the RestFul api documentation. + * + * @return ServletContextHandler of Swagger + */ + private static ServletContextHandler setupSwaggerContextHandler( + ZeppelinConfiguration conf) { + + // Configure Swagger-core + final ServletHolder swaggerServlet = + new ServletHolder(new JerseyJaxrsConfig()); + swaggerServlet.setName("JerseyJaxrsConfig"); + swaggerServlet.setInitParameter("api.version", "1.0.0"); + swaggerServlet.setInitParameter( + "swagger.api.basepath", + "http://localhost:" + conf.getServerPort() + "/api"); + swaggerServlet.setInitOrder(2); + + // Setup the handler + final ServletContextHandler handler = new ServletContextHandler(); + handler.setSessionHandler(new SessionHandler()); + // Bind Swagger-core to the url HOST/api-docs + handler.addServlet(swaggerServlet, "/api-docs/*"); + + // And we are done + return handler; + } + + private static WebAppContext setupWebAppContext( + ZeppelinConfiguration conf) { + + WebAppContext webApp = new WebAppContext(); + File warPath = new File(conf.getString(ConfVars.ZEPPELIN_WAR)); + if (warPath.isDirectory()) { + // Development mode, read from FS + // webApp.setDescriptor(warPath+"/WEB-INF/web.xml"); + webApp.setResourceBase(warPath.getPath()); + webApp.setContextPath("/"); + webApp.setParentLoaderPriority(true); + } else { + // use packaged WAR + webApp.setWar(warPath.getAbsolutePath()); + } + // Explicit bind to root + webApp.addServlet( + new ServletHolder(new AppScriptServlet(conf.getWebSocketPort())), + "/*" + ); + return webApp; + } + + /** + * Handles the WebApplication for Swagger-ui. + * + * @return WebAppContext with swagger ui context + */ + /*private static WebAppContext setupWebAppSwagger( + ZeppelinConfiguration conf) { + + WebAppContext webApp = new WebAppContext(); + File warPath = new File(conf.getString(ConfVars.ZEPPELIN_API_WAR)); + + if (warPath.isDirectory()) { + webApp.setResourceBase(warPath.getPath()); + } else { + webApp.setWar(warPath.getAbsolutePath()); + } + webApp.setContextPath("/docs"); + webApp.setParentLoaderPriority(true); + // Bind swagger-ui to the path HOST/docs + webApp.addServlet(new ServletHolder(new DefaultServlet()), "/docs/*"); + return webApp; + }*/ + + public ZeppelinServer() throws Exception { + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + + this.schedulerFactory = new SchedulerFactory(); + + this.replFactory = new InterpreterFactory(conf); + notebook = new Notebook(conf, schedulerFactory, replFactory, notebookServer); + } + + @Override + public Set<Class<?>> getClasses() { + Set<Class<?>> classes = new HashSet<Class<?>>(); + return classes; + } + + @Override + public java.util.Set<java.lang.Object> getSingletons() { + Set<Object> singletons = new HashSet<Object>(); + + /** Rest-api root endpoint */ + ZeppelinRestApi root = new ZeppelinRestApi(); + singletons.add(root); + + NotebookRestApi notebookApi = new NotebookRestApi(notebook); + singletons.add(notebookApi); + + InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory); + singletons.add(interpreterApi); + + return singletons; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java new file mode 100644 index 0000000..a7b8b66 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.socket; + +import java.util.HashMap; +import java.util.Map; + +/** + * Zeppelin websocker massage template class. + * + * @author Leemoonsoo + * + */ +public class Message { + /** + * Representation of event type. + * + * @author Leemoonsoo + * + */ + public static enum OP { + GET_NOTE, // [c-s] client load note + // @param id note id + + NOTE, // [s-c] note info + // @param note serlialized Note object + + PARAGRAPH, // [s-c] paragraph info + // @param paragraph serialized paragraph object + + PROGRESS, // [s-c] progress update + // @param id paragraph id + // @param progress percentage progress + + NEW_NOTE, // [c-s] create new notebook + DEL_NOTE, // [c-s] delete notebook + // @param id note id + NOTE_UPDATE, + + RUN_PARAGRAPH, // [c-s] run paragraph + // @param id paragraph id + // @param paragraph paragraph content.ie. script + // @param config paragraph config + // @param params paragraph params + + COMMIT_PARAGRAPH, // [c-s] commit paragraph + // @param id paragraph id + // @param title paragraph title + // @param paragraph paragraph content.ie. script + // @param config paragraph config + // @param params paragraph params + + CANCEL_PARAGRAPH, // [c-s] cancel paragraph run + // @param id paragraph id + + MOVE_PARAGRAPH, // [c-s] move paragraph order + // @param id paragraph id + // @param index index the paragraph want to go + + INSERT_PARAGRAPH, // [c-s] create new paragraph below current paragraph + // @param target index + + COMPLETION, // [c-s] ask completion candidates + // @param id + // @param buf current code + // @param cursor cursor position in code + + COMPLETION_LIST, // [s-c] send back completion candidates list + // @param id + // @param completions list of string + + LIST_NOTES, // [c-s] ask list of note + + NOTES_INFO, // [s-c] list of note infos + // @param notes serialized List<NoteInfo> object + + PARAGRAPH_REMOVE, + } + + public OP op; + public Map<String, Object> data = new HashMap<String, Object>(); + + public Message(OP op) { + this.op = op; + } + + public Message put(String k, Object v) { + data.put(k, v); + return this; + } + + public Object get(String k) { + return data.get(k); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java new file mode 100644 index 0000000..db5733e --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.socket; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.notebook.JobListenerFactory; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.JobListener; +import org.apache.zeppelin.scheduler.Job.Status; +import org.apache.zeppelin.server.ZeppelinServer; +import org.apache.zeppelin.socket.Message.OP; +import org.java_websocket.WebSocket; +import org.java_websocket.handshake.ClientHandshake; +import org.java_websocket.server.WebSocketServer; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; +import com.google.gson.Gson; + +/** + * Zeppelin websocket service. + * + * @author anthonycorbacho + */ +public class NotebookServer extends WebSocketServer implements JobListenerFactory { + + private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); + private static final int DEFAULT_PORT = 8282; + + private static void creatingwebSocketServerLog(int port) { + LOG.info("Create zeppelin websocket on port {}", port); + } + + Gson gson = new Gson(); + Map<String, List<WebSocket>> noteSocketMap = new HashMap<String, List<WebSocket>>(); + List<WebSocket> connectedSockets = new LinkedList<WebSocket>(); + + public NotebookServer() { + super(new InetSocketAddress(DEFAULT_PORT)); + creatingwebSocketServerLog(DEFAULT_PORT); + } + + public NotebookServer(int port) { + super(new InetSocketAddress(port)); + creatingwebSocketServerLog(port); + } + + private Notebook notebook() { + return ZeppelinServer.notebook; + } + + @Override + public void onOpen(WebSocket conn, ClientHandshake handshake) { + LOG.info("New connection from {} : {}", conn.getRemoteSocketAddress().getHostName(), conn + .getRemoteSocketAddress().getPort()); + synchronized (connectedSockets) { + connectedSockets.add(conn); + } + } + + @Override + public void onMessage(WebSocket conn, String msg) { + Notebook notebook = notebook(); + try { + Message messagereceived = deserializeMessage(msg); + LOG.info("RECEIVE << " + messagereceived.op); + /** Lets be elegant here */ + switch (messagereceived.op) { + case LIST_NOTES: + broadcastNoteList(); + break; + case GET_NOTE: + sendNote(conn, notebook, messagereceived); + break; + case NEW_NOTE: + createNote(conn, notebook); + break; + case DEL_NOTE: + removeNote(conn, notebook, messagereceived); + break; + case COMMIT_PARAGRAPH: + updateParagraph(conn, notebook, messagereceived); + break; + case RUN_PARAGRAPH: + runParagraph(conn, notebook, messagereceived); + break; + case CANCEL_PARAGRAPH: + cancelParagraph(conn, notebook, messagereceived); + break; + case MOVE_PARAGRAPH: + moveParagraph(conn, notebook, messagereceived); + break; + case INSERT_PARAGRAPH: + insertParagraph(conn, notebook, messagereceived); + break; + case PARAGRAPH_REMOVE: + removeParagraph(conn, notebook, messagereceived); + break; + case NOTE_UPDATE: + updateNote(conn, notebook, messagereceived); + break; + case COMPLETION: + completion(conn, notebook, messagereceived); + break; + default: + broadcastNoteList(); + break; + } + } catch (Exception e) { + LOG.error("Can't handle message", e); + } + } + + @Override + public void onClose(WebSocket conn, int code, String reason, boolean remote) { + LOG.info("Closed connection to {} : {}", conn.getRemoteSocketAddress().getHostName(), conn + .getRemoteSocketAddress().getPort()); + removeConnectionFromAllNote(conn); + synchronized (connectedSockets) { + connectedSockets.remove(conn); + } + } + + @Override + public void onError(WebSocket conn, Exception message) { + removeConnectionFromAllNote(conn); + synchronized (connectedSockets) { + connectedSockets.remove(conn); + } + } + + private Message deserializeMessage(String msg) { + Message m = gson.fromJson(msg, Message.class); + return m; + } + + private String serializeMessage(Message m) { + return gson.toJson(m); + } + + private void addConnectionToNote(String noteId, WebSocket socket) { + synchronized (noteSocketMap) { + removeConnectionFromAllNote(socket); // make sure a socket relates only a single note. + List<WebSocket> socketList = noteSocketMap.get(noteId); + if (socketList == null) { + socketList = new LinkedList<WebSocket>(); + noteSocketMap.put(noteId, socketList); + } + + if (socketList.contains(socket) == false) { + socketList.add(socket); + } + } + } + + private void removeConnectionFromNote(String noteId, WebSocket socket) { + synchronized (noteSocketMap) { + List<WebSocket> socketList = noteSocketMap.get(noteId); + if (socketList != null) { + socketList.remove(socket); + } + } + } + + private void removeNote(String noteId) { + synchronized (noteSocketMap) { + List<WebSocket> socketList = noteSocketMap.remove(noteId); + } + } + + private void removeConnectionFromAllNote(WebSocket socket) { + synchronized (noteSocketMap) { + Set<String> keys = noteSocketMap.keySet(); + for (String noteId : keys) { + removeConnectionFromNote(noteId, socket); + } + } + } + + private String getOpenNoteId(WebSocket socket) { + String id = null; + synchronized (noteSocketMap) { + Set<String> keys = noteSocketMap.keySet(); + for (String noteId : keys) { + List<WebSocket> sockets = noteSocketMap.get(noteId); + if (sockets.contains(socket)) { + id = noteId; + } + } + } + return id; + } + + private void broadcast(String noteId, Message m) { + LOG.info("SEND >> " + m.op); + synchronized (noteSocketMap) { + List<WebSocket> socketLists = noteSocketMap.get(noteId); + if (socketLists == null || socketLists.size() == 0) { + return; + } + for (WebSocket conn : socketLists) { + conn.send(serializeMessage(m)); + } + } + } + + private void broadcastAll(Message m) { + synchronized (connectedSockets) { + for (WebSocket conn : connectedSockets) { + conn.send(serializeMessage(m)); + } + } + } + + private void broadcastNote(Note note) { + broadcast(note.id(), new Message(OP.NOTE).put("note", note)); + } + + private void broadcastNoteList() { + Notebook notebook = notebook(); + List<Note> notes = notebook.getAllNotes(); + List<Map<String, String>> notesInfo = new LinkedList<Map<String, String>>(); + for (Note note : notes) { + Map<String, String> info = new HashMap<String, String>(); + info.put("id", note.id()); + info.put("name", note.getName()); + notesInfo.add(info); + } + broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); + } + + private void sendNote(WebSocket conn, Notebook notebook, Message fromMessage) { + String noteId = (String) fromMessage.get("id"); + if (noteId == null) { + return; + } + Note note = notebook.getNote(noteId); + if (note != null) { + addConnectionToNote(note.id(), conn); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); + } + } + + private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) + throws SchedulerException, IOException { + String noteId = (String) fromMessage.get("id"); + String name = (String) fromMessage.get("name"); + Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); + if (noteId == null) { + return; + } + if (config == null) { + return; + } + Note note = notebook.getNote(noteId); + if (note != null) { + boolean cronUpdated = isCronUpdated(config, note.getConfig()); + note.setName(name); + note.setConfig(config); + + if (cronUpdated) { + notebook.refreshCron(note.id()); + } + note.persist(); + + broadcastNote(note); + broadcastNoteList(); + } + } + + private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) { + boolean cronUpdated = false; + if (configA.get("cron") != null && configB.get("cron") != null + && configA.get("cron").equals(configB.get("cron"))) { + cronUpdated = true; + } else if (configA.get("cron") == null && configB.get("cron") == null) { + cronUpdated = false; + } else if (configA.get("cron") != null || configB.get("cron") != null) { + cronUpdated = true; + } + return cronUpdated; + } + + private void createNote(WebSocket conn, Notebook notebook) throws IOException { + Note note = notebook.createNote(); + note.addParagraph(); // it's an empty note. so add one paragraph + note.persist(); + broadcastNote(note); + broadcastNoteList(); + } + + private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) + throws IOException { + String noteId = (String) fromMessage.get("id"); + if (noteId == null) { + return; + } + Note note = notebook.getNote(noteId); + note.unpersist(); + notebook.removeNote(noteId); + removeNote(noteId); + broadcastNoteList(); + } + + private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMessage) + throws IOException { + String paragraphId = (String) fromMessage.get("id"); + if (paragraphId == null) { + return; + } + Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); + Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); + final Note note = notebook.getNote(getOpenNoteId(conn)); + Paragraph p = note.getParagraph(paragraphId); + p.settings.setParams(params); + p.setConfig(config); + p.setTitle((String) fromMessage.get("title")); + p.setText((String) fromMessage.get("paragraph")); + note.persist(); + broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); + } + + private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMessage) + throws IOException { + final String paragraphId = (String) fromMessage.get("id"); + if (paragraphId == null) { + return; + } + final Note note = notebook.getNote(getOpenNoteId(conn)); + /** We dont want to remove the last paragraph */ + if (!note.isLastParagraph(paragraphId)) { + note.removeParagraph(paragraphId); + note.persist(); + broadcastNote(note); + } + } + + private void completion(WebSocket conn, Notebook notebook, Message fromMessage) { + String paragraphId = (String) fromMessage.get("id"); + String buffer = (String) fromMessage.get("buf"); + int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); + Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); + + if (paragraphId == null) { + conn.send(serializeMessage(resp)); + return; + } + + final Note note = notebook.getNote(getOpenNoteId(conn)); + List<String> candidates = note.completion(paragraphId, buffer, cursor); + resp.put("completions", candidates); + conn.send(serializeMessage(resp)); + } + + private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage) + throws IOException { + final String paragraphId = (String) fromMessage.get("id"); + if (paragraphId == null) { + return; + } + + final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); + final Note note = notebook.getNote(getOpenNoteId(conn)); + note.moveParagraph(paragraphId, newIndex); + note.persist(); + broadcastNote(note); + } + + private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMessage) + throws IOException { + final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); + + final Note note = notebook.getNote(getOpenNoteId(conn)); + note.insertParagraph(index); + note.persist(); + broadcastNote(note); + } + + + private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMessage) + throws IOException { + final String paragraphId = (String) fromMessage.get("id"); + if (paragraphId == null) { + return; + } + + final Note note = notebook.getNote(getOpenNoteId(conn)); + Paragraph p = note.getParagraph(paragraphId); + p.abort(); + } + + private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage) + throws IOException { + final String paragraphId = (String) fromMessage.get("id"); + if (paragraphId == null) { + return; + } + final Note note = notebook.getNote(getOpenNoteId(conn)); + Paragraph p = note.getParagraph(paragraphId); + String text = (String) fromMessage.get("paragraph"); + p.setText(text); + p.setTitle((String) fromMessage.get("title")); + Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); + p.settings.setParams(params); + Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); + p.setConfig(config); + + // if it's the last paragraph, let's add a new one + boolean isTheLastParagraph = note.getLastParagraph().getId().equals(p.getId()); + if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) { + note.addParagraph(); + } + note.persist(); + broadcastNote(note); + + try { + note.run(paragraphId); + } + catch (Exception ex) { + LOG.error("Exception from run", ex); + if (p != null) { + p.setReturn(new InterpreterResult( + InterpreterResult.Code.ERROR, ex.getMessage()), ex); + p.setStatus(Status.ERROR); + } + } + } + + /** + * Need description here. + * + */ + public static class ParagraphJobListener implements JobListener { + private NotebookServer notebookServer; + private Note note; + + public ParagraphJobListener(NotebookServer notebookServer, Note note) { + this.notebookServer = notebookServer; + this.note = note; + } + + @Override + public void onProgressUpdate(Job job, int progress) { + notebookServer.broadcast(note.id(), + new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress())); + } + + @Override + public void beforeStatusChange(Job job, Status before, Status after) {} + + @Override + public void afterStatusChange(Job job, Status before, Status after) { + if (after == Status.ERROR) { + job.getException().printStackTrace(); + } + if (job.isTerminated()) { + LOG.info("Job {} is finished", job.getId()); + try { + note.persist(); + } catch (IOException e) { + e.printStackTrace(); + } + } + notebookServer.broadcastNote(note); + } + } + + @Override + public JobListener getParagraphJobListener(Note note) { + return new ParagraphJobListener(this, note); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java new file mode 100644 index 0000000..f44dc1f --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.socket; + +import java.io.IOException; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.concurrent.ExecutorService; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.java_websocket.SSLSocketChannel2; +import org.java_websocket.server.DefaultSSLWebSocketServerFactory; + +/** + * Extension of the java_websocket library's DefaultSslWebSocketServerFactory + * to require client side authentication during the SSL handshake + */ +public class SslWebSocketServerFactory + extends DefaultSSLWebSocketServerFactory { + + protected boolean needClientAuth; + + public SslWebSocketServerFactory(SSLContext sslcontext) { + super(sslcontext); + initAttributes(); + } + + public SslWebSocketServerFactory( + SSLContext sslcontext, + ExecutorService exec) { + + super(sslcontext, exec); + initAttributes(); + } + + protected void initAttributes() { + this.needClientAuth = false; + } + + @Override + public ByteChannel wrapChannel(SocketChannel channel, SelectionKey key) + throws IOException { + + SSLEngine sslEngine = sslcontext.createSSLEngine(); + sslEngine.setUseClientMode(false); + sslEngine.setNeedClientAuth(needClientAuth); + return new SSLSocketChannel2( channel, sslEngine, exec, key ); + } + + public boolean getNeedClientAuth() { + return needClientAuth; + } + + public void setNeedClientAuth(boolean needClientAuth) { + this.needClientAuth = needClientAuth; + } +} +
