This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new bb17274 [ZEPPELIN-4068] Implement MongoNotebookRepo. bb17274 is described below commit bb172749a65bc3b4ef689a355b62c2168c7ae7e7 Author: yx91490 <yx91...@126.com> AuthorDate: Thu Aug 29 09:28:46 2019 +0800 [ZEPPELIN-4068] Implement MongoNotebookRepo. ### What is this PR for? the format of note storage file name is changed in 0.9, this is the implementation of MongoNotebookRepo.please refer to [ZEPPELIN-2619](https://issues.apache.org/jira/browse/ZEPPELIN-2619). this patch use a folder collection to store path hierarchy. ### What type of PR is it? [Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring] Feature ### Todos * [ ] - this implementation need mongoDB 3.4. ### What is the Jira issue? [https://issues.apache.org/jira/browse/ZEPPELIN-4068](https://issues.apache.org/jira/browse/ZEPPELIN-4068) ### How should this be tested? manual ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: yx91490 <yx91...@126.com> Closes #3432 from yx91490/mongo and squashes the following commits: 6b832db13 [yx91490] remove apache license 5970f912e [yx91490] [ZEPPELIN-4068] use get to assert 19689d029 [yx91490] [ZEPPELIN-4068] spell correct 3c8e0a46d [yx91490] [ZEPPELIN-4068] Add license header fe790879d [yx91490] [ZEPPELIN-4068] Add license header 11c0c5244 [yx91490] [ZEPPELIN-4068] Add comment f232ee7a5 [yx91490] [ZEPPELIN-4068] add Embedded MongoDB license 14d828ef5 [yx91490] [ZEPPELIN-4068] Implement MongoNotebookRepo,based on 0.9 5c0b2fbe6 [yx91490] [ZEPPELIN-4068] Implement MongoNotebookRepo --- LICENSE | 1 + conf/zeppelin-site.xml.template | 2 +- .../zeppelin/conf/ZeppelinConfiguration.java | 4 + zeppelin-plugins/notebookrepo/mongo/pom.xml | 75 ++++ .../apache/zeppelin/notebook/repo/AutoLock.java | 24 ++ .../zeppelin/notebook/repo/AutoReadWriteLock.java | 57 +++ .../zeppelin/notebook/repo/MongoNotebookRepo.java | 470 +++++++++++++++++++++ .../notebook/repo/OldMongoNotebookRepo.java | 235 +++++++++++ .../notebook/repo/MongoNotebookRepoTest.java | 127 ++++++ .../zeppelin/notebook/repo/ToPathArrayTest.java | 101 +++++ zeppelin-plugins/pom.xml | 1 + 11 files changed, 1096 insertions(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index f75c453..40c1143 100644 --- a/LICENSE +++ b/LICENSE @@ -261,6 +261,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version (Apache 2.0) jarchivelib (https://github.com/thrau/jarchivelib) (Apache 2.0) Google Cloud Client Library for Java (https://github.com/GoogleCloudPlatform/google-cloud-java) (Apache 2.0) concurrentunit (https://github.com/jhalterman/concurrentunit) + (Apache 2.0) Embedded MongoDB (https://github.com/flapdoodle-oss/de.flapdoodle.embed.mongo) ======================================================================== BSD 3-Clause licenses diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 1b88e46..e02722b 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -286,7 +286,7 @@ <property> <name>zeppelin.notebook.mongo.autoimport</name> <value>false</value> - <description>import local notes into MongoDB automatically on startup</description> + <description>import local notes into MongoDB automatically on startup, reset to false after import to avoid repeated import</description> </property> --> diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index c6d63ea..88b8e3c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -465,6 +465,9 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_COLLECTION); } + public String getMongoFolder() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_FOLDER); + } public boolean getMongoAutoimport() { return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT); } @@ -839,6 +842,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"), ZEPPELIN_NOTEBOOK_MONGO_DATABASE("zeppelin.notebook.mongo.database", "zeppelin"), ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"), + ZEPPELIN_NOTEBOOK_MONGO_FOLDER("zeppelin.notebook.mongo.folder", "folders"), ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"), ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false), ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", diff --git a/zeppelin-plugins/notebookrepo/mongo/pom.xml b/zeppelin-plugins/notebookrepo/mongo/pom.xml new file mode 100644 index 0000000..dcd19a6 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/mongo/pom.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zengine-plugins-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../../../zeppelin-plugins</relativePath> + </parent> + + <artifactId>notebookrepo-mongo</artifactId> + <packaging>jar</packaging> + <version>0.9.0-SNAPSHOT</version> + + <name>Zeppelin: Plugin MongoNotebookRepo</name> + <description>NotebookRepo implementation based on MongoDB</description> + + <properties> + <plugin.name>NotebookRepo/MongoNotebookRepo</plugin.name> + </properties> + + <dependencies> + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>3.4.1</version> + </dependency> + <dependency> + <groupId>de.flapdoodle.embed</groupId> + <artifactId>de.flapdoodle.embed.mongo</artifactId> + <version>2.2.0</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> diff --git a/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/AutoLock.java b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/AutoLock.java new file mode 100644 index 0000000..74b4f9b --- /dev/null +++ b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/AutoLock.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import java.io.Closeable; + +public interface AutoLock extends Closeable { + void close(); +} diff --git a/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/AutoReadWriteLock.java b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/AutoReadWriteLock.java new file mode 100644 index 0000000..1d9583e --- /dev/null +++ b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/AutoReadWriteLock.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * wrap ReentrantReadWriteLock with Closeable interface, to avoid + * call lock's close() method every time. + */ +public class AutoReadWriteLock { + + private final ReentrantReadWriteLock rwlock; + + public AutoReadWriteLock() { + this(new ReentrantReadWriteLock()); + } + + public AutoReadWriteLock(ReentrantReadWriteLock rwlock) { + this.rwlock = rwlock; + } + + public AutoLock lockForRead() { + rwlock.readLock().lock(); + return new AutoLock() { + @Override + public void close() { + rwlock.readLock().unlock(); + } + }; + } + + public AutoLock lockForWrite() { + rwlock.writeLock().lock(); + return new AutoLock() { + @Override + public void close() { + rwlock.writeLock().unlock(); + } + }; + } +} diff --git a/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java new file mode 100644 index 0000000..9fdd180 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepo.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import static com.mongodb.client.model.Filters.and; +import static com.mongodb.client.model.Filters.eq; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.bson.types.ObjectId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.InsertManyOptions; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.Updates; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; + +/** + * Backend for storing Notebook on MongoDB. + */ +public class MongoNotebookRepo implements NotebookRepo { + + private static final Logger LOG = LoggerFactory.getLogger(MongoNotebookRepo.class); + + private ZeppelinConfiguration conf; + + private MongoClient client; + + private MongoDatabase db; + + private MongoCollection<Document> notes; + + private MongoCollection<Document> folders; + + private String folderName; + + private AutoReadWriteLock lock = new AutoReadWriteLock(); + + public MongoNotebookRepo() { + } + + @Override + public void init(ZeppelinConfiguration zConf) throws IOException { + this.conf = zConf; + client = new MongoClient(new MongoClientURI(conf.getMongoUri())); + db = client.getDatabase(conf.getMongoDatabase()); + notes = db.getCollection(conf.getMongoCollection()); + folderName = conf.getMongoFolder(); + folders = db.getCollection(folderName); + + if (conf.getMongoAutoimport()) { + // import local notes into MongoDB + insertFileSystemNotes(); + } + } + + /** + * If environment variable ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT is true, + * this method will insert local notes into MongoDB on startup. + * If a note already exists in MongoDB, skip it. + */ + private void insertFileSystemNotes() throws IOException { + NotebookRepo vfsRepo = new VFSNotebookRepo(); + vfsRepo.init(this.conf); + Map<String, NoteInfo> infos = vfsRepo.list(null); + + try (AutoLock autoLock = lock.lockForWrite()) { + for (NoteInfo info : infos.values()) { + Note note = vfsRepo.get(info.getId(), info.getPath(), null); + saveOrIgnore(note, null); + } + } + + vfsRepo.close(); + } + + @Override + public Map<String, NoteInfo> list(AuthenticationInfo subject) throws IOException { + LOG.debug("list repo."); + Map<String, NoteInfo> infos = new HashMap<>(); + + Document match = new Document("$match", new Document(Fields.IS_DIR, false)); + Document graphLookup = new Document("$graphLookup", + new Document("from", folderName) + .append("startWith", "$" + Fields.PID) + .append("connectFromField", Fields.PID) + .append("connectToField", Fields.ID) + .append("as", Fields.FULL_PATH)); + + try (AutoLock autoLock = lock.lockForRead()) { + ArrayList<Document> list = Lists.newArrayList(match, graphLookup); + AggregateIterable<Document> aggregate = folders.aggregate(list); + for (Document document : aggregate) { + String id = document.getString(Fields.ID); + String name = document.getString(Fields.NAME); + List<Document> fullPath = document.get(Fields.FULL_PATH, List.class); + + StringBuilder sb = new StringBuilder(); + for (Document pathNode : fullPath) { + sb.append("/").append(pathNode.getString(Fields.NAME)); + } + + String fullPathStr = sb.append("/").append(name).toString(); + + NoteInfo noteInfo = new NoteInfo(id, fullPathStr); + infos.put(id, noteInfo); + } + } + + return infos; + } + + @Override + public Note get(String noteId, String notePath, AuthenticationInfo subject) throws IOException { + LOG.debug("get note, noteId: {}, notePath:{}", noteId, notePath); + + return getNote(noteId, notePath); + } + + private Note getNote(String noteId, String notePath) throws IOException { + Document doc = notes.find(eq(Fields.ID, noteId)).first(); + if (doc == null) { + throw new IOException("Note '" + noteId + "' in path '" + notePath + "'not found"); + } + + return documentToNote(doc); + } + + @Override + public void save(Note note, AuthenticationInfo subject) throws IOException { + LOG.debug("save note, note: {}", note); + String[] pathArray = toPathArray(note.getPath(), false); + + try (AutoLock autoLock = lock.lockForWrite()) { + String pId = completeFolder(pathArray); + saveNote(note); + saveNotePath(note.getId(), note.getName(), pId); + } + } + + private void saveOrIgnore(Note note, AuthenticationInfo subject) { + try { + String[] pathArray = toPathArray(note.getPath(), false); + + String pId = completeFolder(pathArray); + saveNoteOrIgnore(note); + saveNotePathOrIgnore(note.getId(), note.getName(), pId); + } catch (Exception e) { + LOG.warn("ignore error when insert note '{}': {}", note, e.getMessage()); + } + } + + /** + * save note to path. + * + * @param noteId note id + * @param pId note parent folder id + */ + private void saveNotePath(String noteId, String noteName, String pId) { + Document filter = new Document(Fields.ID, noteId); + Document doc = new Document(Fields.ID, noteId) + .append(Fields.PID, pId) + .append(Fields.IS_DIR, false) + .append(Fields.NAME, noteName); + + folders.replaceOne(filter, doc, new UpdateOptions().upsert(true)); + } + + private void saveNotePathOrIgnore(String noteId, String noteName, String pId) { + Document doc = new Document(Fields.ID, noteId) + .append(Fields.PID, pId) + .append(Fields.IS_DIR, false) + .append(Fields.NAME, noteName); + + folders.insertMany(Collections.singletonList(doc), new InsertManyOptions().ordered(false)); + } + + private void saveNote(Note note) { + Document doc = noteToDocument(note); + notes.replaceOne(eq(Fields.ID, note.getId()), doc, new UpdateOptions().upsert(true)); + } + + private void saveNoteOrIgnore(Note note) { + Document doc = noteToDocument(note); + notes.insertMany(Collections.singletonList(doc), new InsertManyOptions().ordered(false)); + } + + @Override + public void move(String noteId, String notePath, String newNotePath, + AuthenticationInfo subject) throws IOException { + LOG.debug("move note, noteId: {}, notePath: {}, newNotePath: {}", + noteId, notePath, newNotePath); + if (StringUtils.equals(notePath, newNotePath)) { + return; + } + String[] pathArray = toPathArray(newNotePath, true); + String[] parentPathArray = Arrays.copyOfRange(pathArray, 0, pathArray.length - 1); + String noteName = pathArray[pathArray.length - 1]; + + try (AutoLock autoLock = lock.lockForWrite()) { + String pId = completeFolder(parentPathArray); + moveNote(noteId, pId, noteName); + } + } + + private void moveNote(String noteId, String parentId, String noteName) { + Document doc = new Document("$set", + new Document(Fields.PID, parentId) + .append(Fields.NAME, noteName)); + + folders.updateOne(eq(Fields.ID, noteId), doc); + notes.updateOne(eq(Fields.ID, noteId), Updates.set(Fields.NAME, noteName)); + } + + @Override + public void move(String folderPath, String newFolderPath, + AuthenticationInfo subject) throws IOException { + LOG.debug("move folder, folderPath: {}, newFolderPath: {}", folderPath, newFolderPath); + if (StringUtils.equals(folderPath, newFolderPath)) { + return; + } + + String[] pathArray = toPathArray(folderPath, true); + String[] newPathArray = toPathArray(newFolderPath, true); + String[] newFolderParentArray = Arrays.copyOfRange(newPathArray, 0, newPathArray.length - 1); + + try (AutoLock autoLock = lock.lockForWrite()) { + String id = findFolder(pathArray); + String newPId = completeFolder(newFolderParentArray); + String newFolderName = newPathArray[newPathArray.length - 1]; + + Document doc = new Document("$set", + new Document(Fields.ID, id) + .append(Fields.PID, newPId) + .append(Fields.IS_DIR, true) + .append(Fields.NAME, newFolderName)); + + folders.updateOne(eq(Fields.ID, id), doc); + } + } + + @Override + public void remove(String noteId, String notePath, + AuthenticationInfo subject) throws IOException { + LOG.debug("remove note, noteId:{}, notePath:{}", noteId, notePath); + + try (AutoLock autoLock = lock.lockForWrite()) { + folders.deleteOne(eq(Fields.ID, noteId)); + notes.deleteOne(eq(Fields.ID, noteId)); + + //clean empty folder + String[] pathArray = toPathArray(notePath, false); + for (int i = pathArray.length; i >= 0; i--) { + String[] current = Arrays.copyOfRange(pathArray, 0, i); + String folderId = findFolder(current); + boolean isEmpty = folders.count(eq(Fields.PID, folderId)) <= 0; + if (isEmpty) { + folders.deleteOne(eq(Fields.ID, folderId)); + } else { + break; + } + } + } + } + + @Override + public void remove(String folderPath, AuthenticationInfo subject) throws IOException { + LOG.debug("remove folder, folderPath: {}", folderPath); + String[] pathArray = toPathArray(folderPath, true); + + try (AutoLock autoLock = lock.lockForWrite()) { + String id = findFolder(pathArray); + FindIterable<Document> iter = folders.find(eq(Fields.PID, id)); + for (Document node : iter) { + String nodeId = node.getString(Fields.ID); + Boolean isDir = node.getBoolean(Fields.IS_DIR); + String nodeName = node.getString(Fields.NAME); + + if (isDir) { + StringBuilder sb = new StringBuilder(); + for (String s : pathArray) { + sb.append("/").append(s); + } + sb.append("/").append(nodeName); + + String nodePath = sb.toString(); + remove(nodePath, subject); + } else { + folders.deleteOne(eq(Fields.ID, nodeId)); + notes.deleteOne(eq(Fields.ID, nodeId)); + } + + folders.deleteOne(eq(Fields.ID, nodeId)); + } + } + } + + @Override + public void close() { + client.close(); + } + + @Override + public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject) { + LOG.warn("Method not implemented"); + return Collections.emptyList(); + } + + @Override + public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) { + LOG.warn("Method not implemented"); + } + + /** + * create until parent folder if not exists. + * + * @param splitPath path to completed. + * @return direct parent folder id + */ + private String completeFolder(String[] splitPath) { + String pId = "0"; + for (String currentPath : splitPath) { + Document query = new Document(Fields.PID, pId) + .append(Fields.IS_DIR, true) + .append(Fields.NAME, currentPath); + + String cId = new ObjectId().toString(); + Document doc = new Document("$setOnInsert", + new Document(Fields.ID, cId) + .append(Fields.PID, pId) + .append(Fields.IS_DIR, true) + .append(Fields.NAME, currentPath)); + + Document exist = folders.find(query).first(); + if (exist == null) { + folders.updateOne(query, doc, new UpdateOptions().upsert(true)); + pId = cId; + } else { + pId = exist.getString(Fields.ID); + } + } + + return pId; + } + + /** + * @param splitPath folder path to find + * @return direct parent folder id + */ + private String findFolder(String[] splitPath) { + String pId = "0"; + if ((splitPath.length == 1 && "".equals(splitPath[0])) + || ArrayUtils.isEmpty(splitPath)) { + return pId; + } + for (String currentPath : splitPath) { + Bson where = and(eq(Fields.PID, pId), + eq(Fields.IS_DIR, true), + eq(Fields.NAME, currentPath)); + Document node = folders.find(where).first(); + if (null == node) { + throw new IllegalStateException("folder not found in path:" + currentPath); + } + + pId = node.getString(Fields.ID); + } + + return pId; + } + + /** + * e.g. "/a/b" => [a, b]. + * + * @param notePath path in str + * @param includeLast whether return file/folder name in path + * @return path in array + */ + String[] toPathArray(String notePath, boolean includeLast) { + if (null == notePath || notePath.length() == 0) { + throw new NullPointerException("notePath is null"); + } + //replace multiple "/" to one "/" + notePath = notePath.replaceAll("/+", "/"); + + if ("/".equals(notePath)) { + return ArrayUtils.EMPTY_STRING_ARRAY; + } + + //remove leading "/" + if (notePath.startsWith("/")) { + notePath = notePath.substring(1); + } + + String[] arr = notePath.split("/"); + + return includeLast ? arr : Arrays.copyOfRange(arr, 0, arr.length - 1); + } + + /** + * Convert document to note. + */ + private Note documentToNote(Document doc) { + // document to JSON + String json = doc.toJson(); + // JSON to note + return Note.fromJson(json); + } + + /** + * Convert note to document. + */ + private Document noteToDocument(Note note) { + // note to JSON + String json = note.toJson(); + // JSON to document + Document doc = Document.parse(json); + // set object id as note id + doc.put(Fields.ID, note.getId()); + return doc; + } + + private class Fields { + + private static final String ID = "_id"; + + private static final String NAME = "name"; + + private static final String IS_DIR = "isDir"; + + /** + * parent folder id. + */ + private static final String PID = "pId"; + + private static final String FULL_PATH = "fullPath"; + } +} diff --git a/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/OldMongoNotebookRepo.java b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/OldMongoNotebookRepo.java new file mode 100644 index 0000000..0cd07eb --- /dev/null +++ b/zeppelin-plugins/notebookrepo/mongo/src/main/java/org/apache/zeppelin/notebook/repo/OldMongoNotebookRepo.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import static com.mongodb.client.model.Filters.eq; +import static com.mongodb.client.model.Filters.in; +import static com.mongodb.client.model.Filters.type; +import org.bson.BsonType; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import com.mongodb.MongoBulkWriteException; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.bulk.BulkWriteError; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.InsertManyOptions; +import com.mongodb.client.model.UpdateOptions; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.OldNoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; + +/** + * Backend for storing Notebook on MongoDB + */ +public class OldMongoNotebookRepo implements OldNotebookRepo { + private static final Logger LOG = LoggerFactory.getLogger(MongoNotebookRepo.class); + + private ZeppelinConfiguration conf; + private MongoClient mongo; + private MongoDatabase db; + private MongoCollection<Document> coll; + + @Override + public void init(ZeppelinConfiguration zConf) throws IOException { + this.conf = zConf; + + mongo = new MongoClient(new MongoClientURI(conf.getMongoUri())); + db = mongo.getDatabase(conf.getMongoDatabase()); + coll = db.getCollection(conf.getMongoCollection()); + + if (conf.getMongoAutoimport()) { + // import local notes into MongoDB + insertFileSystemNotes(); + } + } + + /** + * If environment variable ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT is true, + * this method will insert local notes into MongoDB on startup. + * If a note already exists in MongoDB, skip it. + */ + private void insertFileSystemNotes() throws IOException { + LinkedList<Document> docs = new LinkedList<>(); // docs to be imported + OldNotebookRepo vfsRepo = new OldVFSNotebookRepo(); + vfsRepo.init(this.conf); + List<OldNoteInfo> infos = vfsRepo.list(null); + // collect notes to be imported + for (OldNoteInfo info : infos) { + Note note = vfsRepo.get(info.getId(), null); + Document doc = noteToDocument(note); + docs.add(doc); + } + + /* + * 'ordered(false)' option allows to proceed bulk inserting even though + * there are duplicated documents. The duplicated documents will be skipped + * and print a WARN log. + */ + try { + coll.insertMany(docs, new InsertManyOptions().ordered(false)); + } catch (MongoBulkWriteException e) { + printDuplicatedException(e); //print duplicated document warning log + } + + vfsRepo.close(); // it does nothing for now but maybe in the future... + } + + /** + * MongoBulkWriteException contains error messages that inform + * which documents were duplicated. This method catches those ID and print them. + * @param e + */ + private void printDuplicatedException(MongoBulkWriteException e) { + List<BulkWriteError> errors = e.getWriteErrors(); + for (BulkWriteError error : errors) { + String msg = error.getMessage(); + Pattern pattern = Pattern.compile("[A-Z0-9]{9}"); // regex for note ID + Matcher matcher = pattern.matcher(msg); + if (matcher.find()) { // if there were a note ID + String noteId = matcher.group(); + LOG.warn("Note " + noteId + " not inserted since already exists in MongoDB"); + } + } + } + + @Override + public List<OldNoteInfo> list(AuthenticationInfo subject) throws IOException { + syncId(); + + List<OldNoteInfo> infos = new LinkedList<>(); + MongoCursor<Document> cursor = coll.find().iterator(); + + while (cursor.hasNext()) { + Document doc = cursor.next(); + Note note = documentToNote(doc); + OldNoteInfo info = new OldNoteInfo(note); + infos.add(info); + } + + cursor.close(); + + return infos; + } + + /** + * Find documents of which type of _id is object ID, and change it to note ID. + * Since updating _id field is not allowed, remove original documents and insert + * new ones with string _id(note ID) + */ + private void syncId() { + // find documents whose id type is object id + MongoCursor<Document> cursor = coll.find(type("_id", BsonType.OBJECT_ID)).iterator(); + // if there is no such document, exit + if (!cursor.hasNext()) + return; + + List<ObjectId> oldDocIds = new LinkedList<>(); // document ids need to update + List<Document> updatedDocs = new LinkedList<>(); // new documents to be inserted + + while (cursor.hasNext()) { + Document doc = cursor.next(); + // store original _id + ObjectId oldId = doc.getObjectId("_id"); + oldDocIds.add(oldId); + // store the document with string _id (note id) + String noteId = doc.getString("id"); + doc.put("_id", noteId); + updatedDocs.add(doc); + } + + coll.insertMany(updatedDocs); + coll.deleteMany(in("_id", oldDocIds)); + + cursor.close(); + } + + /** + * Convert document to note + */ + private Note documentToNote(Document doc) { + // document to JSON + String json = doc.toJson(); + // JSON to note + return Note.fromJson(json); + } + + /** + * Convert note to document + */ + private Document noteToDocument(Note note) { + // note to JSON + String json = note.toJson(); + // JSON to document + Document doc = Document.parse(json); + // set object id as note id + doc.put("_id", note.getId()); + return doc; + } + + @Override + public Note get(String noteId, AuthenticationInfo subject) throws IOException { + Document doc = coll.find(eq("_id", noteId)).first(); + + if (doc == null) { + throw new IOException("Note " + noteId + "not found"); + } + + return documentToNote(doc); + } + + @Override + public void save(Note note, AuthenticationInfo subject) throws IOException { + Document doc = noteToDocument(note); + coll.replaceOne(eq("_id", note.getId()), doc, new UpdateOptions().upsert(true)); + } + + @Override + public void remove(String noteId, AuthenticationInfo subject) throws IOException { + coll.deleteOne(eq("_id", noteId)); + } + + @Override + public void close() { + mongo.close(); + } + + @Override + public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject) { + LOG.warn("Method not implemented"); + return Collections.emptyList(); + } + + @Override + public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) { + LOG.warn("Method not implemented"); + } + +} diff --git a/zeppelin-plugins/notebookrepo/mongo/src/test/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepoTest.java b/zeppelin-plugins/notebookrepo/mongo/src/test/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepoTest.java new file mode 100644 index 0000000..9c103cb --- /dev/null +++ b/zeppelin-plugins/notebookrepo/mongo/src/test/java/org/apache/zeppelin/notebook/repo/MongoNotebookRepoTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI; +import static org.junit.Assert.assertEquals; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Map; +import de.flapdoodle.embed.mongo.MongodExecutable; +import de.flapdoodle.embed.mongo.MongodStarter; +import de.flapdoodle.embed.mongo.config.IMongodConfig; +import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; +import de.flapdoodle.embed.mongo.config.Net; +import de.flapdoodle.embed.mongo.distribution.Version; +import de.flapdoodle.embed.process.runtime.Network; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.user.AuthenticationInfo; + +public class MongoNotebookRepoTest { + + private MongodExecutable mongodExecutable; + + private ZeppelinConfiguration zConf; + + private MongoNotebookRepo notebookRepo; + + @Before + public void setUp() throws IOException { + String bindIp = "localhost"; + int port = new ServerSocket(0).getLocalPort(); + + IMongodConfig mongodConfig = new MongodConfigBuilder() + .version(Version.Main.PRODUCTION) + .net(new Net(bindIp, port, Network.localhostIsIPv6())) + .build(); + + mongodExecutable = MongodStarter.getDefaultInstance() + .prepare(mongodConfig); + mongodExecutable.start(); + + System.setProperty(ZEPPELIN_NOTEBOOK_MONGO_URI.getVarName(), "mongodb://" + bindIp + ":" + port); + zConf = new ZeppelinConfiguration(); + notebookRepo = new MongoNotebookRepo(); + notebookRepo.init(zConf); + } + + @After + public void tearDown() throws IOException { + if (mongodExecutable != null) { + mongodExecutable.stop(); + } + } + + @Test + public void testBasics() throws IOException { + assertEquals(0, notebookRepo.list(AuthenticationInfo.ANONYMOUS).size()); + + // create note1 + Note note1 = new Note(); + note1.setPath("/my_project/my_note1"); + Paragraph p1 = note1.insertNewParagraph(0, AuthenticationInfo.ANONYMOUS); + p1.setText("%md hello world"); + p1.setTitle("my title"); + notebookRepo.save(note1, AuthenticationInfo.ANONYMOUS); + + Map<String, NoteInfo> noteInfos = notebookRepo.list(AuthenticationInfo.ANONYMOUS); + assertEquals(1, noteInfos.size()); + Note note1Loaded = notebookRepo.get(note1.getId(), note1.getPath(), AuthenticationInfo.ANONYMOUS); + assertEquals(note1.getId(), note1Loaded.getId()); + assertEquals(note1.getName(), note1Loaded.getName()); + + // create note2 + Note note2 = new Note(); + note2.setPath("/my_note2"); + Paragraph p2 = note2.insertNewParagraph(0, AuthenticationInfo.ANONYMOUS); + p2.setText("%md hello world2"); + p2.setTitle("my title2"); + notebookRepo.save(note2, AuthenticationInfo.ANONYMOUS); + + noteInfos = notebookRepo.list(AuthenticationInfo.ANONYMOUS); + assertEquals(2, noteInfos.size()); + + // move note2 + String newPath = "/my_project2/my_note2"; + notebookRepo.move(note2.getId(), note2.getPath(), "/my_project2/my_note2", AuthenticationInfo.ANONYMOUS); + + Note note3 = notebookRepo.get(note2.getId(), newPath, AuthenticationInfo.ANONYMOUS); + assertEquals(note2, note3); + + // move folder + notebookRepo.move("/my_project2", "/my_project3/my_project2", AuthenticationInfo.ANONYMOUS); + noteInfos = notebookRepo.list(AuthenticationInfo.ANONYMOUS); + assertEquals(2, noteInfos.size()); + + Note note4 = notebookRepo.get(note3.getId(), "/my_project3/my_project2/my_note2", AuthenticationInfo.ANONYMOUS); + assertEquals(note3, note4); + + // remove note1 + notebookRepo.remove(note1.getId(), note1.getPath(), AuthenticationInfo.ANONYMOUS); + assertEquals(1, notebookRepo.list(AuthenticationInfo.ANONYMOUS).size()); + + notebookRepo.remove("/my_project3", AuthenticationInfo.ANONYMOUS); + assertEquals(0, notebookRepo.list(AuthenticationInfo.ANONYMOUS).size()); + } +} \ No newline at end of file diff --git a/zeppelin-plugins/notebookrepo/mongo/src/test/java/org/apache/zeppelin/notebook/repo/ToPathArrayTest.java b/zeppelin-plugins/notebookrepo/mongo/src/test/java/org/apache/zeppelin/notebook/repo/ToPathArrayTest.java new file mode 100644 index 0000000..5796c49 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/mongo/src/test/java/org/apache/zeppelin/notebook/repo/ToPathArrayTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import static org.junit.Assert.assertArrayEquals; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; + + +@RunWith(Parameterized.class) +public class ToPathArrayTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private MongoNotebookRepo repo = new MongoNotebookRepo(); + + private String pathStr; + + private boolean includeLast; + + private String[] expactPathArray; + + public ToPathArrayTest(String pathStr, boolean includeLast, String[] expactPathArray) { + this.pathStr = pathStr; + this.includeLast = includeLast; + this.expactPathArray = expactPathArray; + } + + @Parameterized.Parameters + public static Collection params() { + Object[][] arrs = { + {null, true, null}, + {null, false, null}, + {"", true, null}, + {"", false, null}, + {"/", true, new String[0]}, + {"/", false, new String[0]}, + + {"/abc", true, new String[]{"abc"}}, + {"/abc/", true, new String[]{"abc"}}, + {"/a/b/c", true, new String[]{"a", "b", "c"}}, + {"/a/b//c/", true, new String[]{"a", "b", "c"}}, + + {"/abc", false, new String[]{}}, + {"/abc/", false, new String[]{}}, + {"/a/b/c", false, new String[]{"a", "b"}}, + {"/a/b//c/", false, new String[]{"a", "b"}}, + + {"abc", true, new String[]{"abc"}}, + {"abc/", true, new String[]{"abc"}}, + {"a/b/c", true, new String[]{"a", "b", "c"}}, + {"a/b//c/", true, new String[]{"a", "b", "c"}}, + + {"abc", false, new String[]{}}, + {"abc/", false, new String[]{}}, + {"a/b/c", false, new String[]{"a", "b"}}, + {"a/b//c/", false, new String[]{"a", "b"}}, + }; + return Arrays.asList(arrs); + } + + @Test + public void runTest() { + if (expactPathArray == null) { + runForThrow(); + } else { + runNormally(); + } + } + + private void runForThrow() { + thrown.expect(NullPointerException.class); + runNormally(); + } + + private void runNormally() { + String[] pathArray = repo.toPathArray(pathStr, includeLast); + assertArrayEquals(expactPathArray, pathArray); + } +} diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml index bca6421..0586c6d 100644 --- a/zeppelin-plugins/pom.xml +++ b/zeppelin-plugins/pom.xml @@ -42,6 +42,7 @@ <module>notebookrepo/gcs</module> <module>notebookrepo/zeppelin-hub</module> <module>notebookrepo/filesystem</module> + <module>notebookrepo/mongo</module> <module>launcher/k8s-standard</module> <module>launcher/cluster</module>