This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 525d3617b0 [ZEPPELIN-5744] Allow NoteManager for concurrent operation (#4374) 525d3617b0 is described below commit 525d3617b08573dc0272cc94735721a09b21716a Author: Leomax_Sun <282130...@qq.com> AuthorDate: Fri Jun 3 20:56:17 2022 +0800 [ZEPPELIN-5744] Allow NoteManager for concurrent operation (#4374) --- .../org/apache/zeppelin/notebook/NoteManager.java | 19 ++-- .../apache/zeppelin/notebook/NoteManagerTest.java | 119 +++++++++++++++++++++ 2 files changed, 131 insertions(+), 7 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java index ba6e443f8b..e2d393af54 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import javax.inject.Inject; @@ -79,7 +80,7 @@ public class NoteManager { // build the tree structure of notes private void init() throws IOException { this.notesInfo = notebookRepo.list(AuthenticationInfo.ANONYMOUS).values().stream() - .collect(Collectors.toMap(NoteInfo::getId, NoteInfo::getPath)); + .collect(Collectors.toConcurrentMap(NoteInfo::getId, NoteInfo::getPath)); this.notesInfo.entrySet().stream() .forEach(entry -> { @@ -181,7 +182,11 @@ public class NoteManager { } else { addOrUpdateNoteNode(new NoteInfo(note)); noteCache.putNote(note); - this.notebookRepo.save(note, subject); + // Make sure to execute `notebookRepo.save()` successfully in concurrent context + // Otherwise, the NullPointerException will be thrown when invoking notebookRepo.get() in the following operations. + synchronized (this) { + this.notebookRepo.save(note, subject); + } } } @@ -218,7 +223,6 @@ public class NoteManager { public void moveNote(String noteId, String newNotePath, AuthenticationInfo subject) throws IOException { - String notePath = this.notesInfo.get(noteId); if (noteId == null) { throw new IOException("No metadata found for this note: " + noteId); } @@ -228,6 +232,7 @@ public class NoteManager { } // move the old NoteNode from notePath to newNotePath + String notePath = this.notesInfo.get(noteId); NoteNode noteNode = getNoteNode(notePath); noteNode.getParent().removeNote(getNoteName(notePath)); noteNode.setNotePath(newNotePath); @@ -317,10 +322,10 @@ public class NoteManager { */ public <T> T processNote(String noteId, boolean reload, NoteProcessor<T> noteProcessor) throws IOException { - String notePath = this.notesInfo.get(noteId); - if (notePath == null) { + if (this.notesInfo == null || noteId == null || !this.notesInfo.containsKey(noteId)) { return noteProcessor.process(null); } + String notePath = this.notesInfo.get(noteId); NoteNode noteNode = getNoteNode(notePath); return noteNode.loadAndProcessNote(reload, noteProcessor); } @@ -433,9 +438,9 @@ public class NoteManager { private NoteCache noteCache; // noteName -> NoteNode - private Map<String, NoteNode> notes = new HashMap<>(); + private Map<String, NoteNode> notes = new ConcurrentHashMap<>(); // folderName -> Folder - private Map<String, Folder> subFolders = new HashMap<>(); + private Map<String, Folder> subFolders = new ConcurrentHashMap<>(); public Folder(String name, NotebookRepo notebookRepo, NoteCache noteCache) { this.name = name; diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java index 822b6e87fe..f8d58dd272 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java @@ -28,6 +28,11 @@ import org.junit.rules.ExpectedException; import java.io.IOException; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -158,4 +163,118 @@ public class NoteManagerTest { assertFalse(noteManager.containsNote(noteNew2.getPath())); assertEquals(cacheThreshold, noteManager.getCacheSize()); } + + @Test + public void testConcurrentOperation() throws Exception { + int threshold = 10, noteNum = 150; + Map<Integer, String> notes = new ConcurrentHashMap<>(); + ExecutorService threadPool = Executors.newFixedThreadPool(threshold); + // Save note concurrently + ConcurrentTask saveNote = new ConcurrentTaskSaveNote(threadPool, noteNum, notes, "/prod/note%s"); + saveNote.exec(); + // Move note concurrently + ConcurrentTask moveNote = new ConcurrentTaskMoveNote(threadPool, noteNum, notes, "/dev/project_%s/my_note%s"); + moveNote.exec(); + // Move folder concurrently + ConcurrentTask moveFolder = new ConcurrentTaskMoveFolder(threadPool, noteNum, notes, "/staging/note_%s/my_note%s"); + moveFolder.exec(); + // Remove note concurrently + ConcurrentTask removeNote = new ConcurrentTaskRemoveNote(threadPool, noteNum, notes, null); + removeNote.exec(); + threadPool.shutdown(); + } + + abstract class ConcurrentTask { + private ExecutorService threadPool; + private int noteNum; + private Map<Integer, String> notes; + private String pathPattern; + + public ConcurrentTask(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) { + this.threadPool = threadPool; + this.noteNum = noteNum; + this.notes = notes; + this.pathPattern = pathPattern; + } + + public abstract void run(int index) throws IOException; + + public void exec() throws Exception { + // Simulate concurrent operation + CountDownLatch latch = new CountDownLatch(noteNum); + for (int i = 0; i < noteNum; i++) { + int index = i; + threadPool.execute(() -> { + try { + this.run(index); + latch.countDown(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } + // wait till all tasks are completed with 5 seconds as timeout threshold + assertTrue(latch.await(5, TimeUnit.SECONDS)); + this.checkPathByPattern(); + } + + private void checkPathByPattern() throws IOException { + assertEquals(this.notes.size(), noteManager.getNotesInfo().size()); + if (notes.isEmpty()) return; + for (Integer key : this.notes.keySet()) { + String expectPath = String.format(this.pathPattern, key, key); + assertEquals(expectPath, noteManager.processNote(notes.get(key), n -> n).getPath()); + } + } + } + + class ConcurrentTaskSaveNote extends ConcurrentTask { + public ConcurrentTaskSaveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) { + super(threadPool, noteNum, notes, pathPattern); + } + + @Override + public void run(int index) throws IOException { + String tarPath = String.format(super.pathPattern, index, index); + Note note = createNote(tarPath); + noteManager.saveNote(note); + super.notes.put(index, note.getId()); + } + } + + class ConcurrentTaskMoveNote extends ConcurrentTask { + public ConcurrentTaskMoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) { + super(threadPool, noteNum, notes, pathPattern); + } + + @Override + public void run(int index) throws IOException { + String tarPath = String.format(super.pathPattern, index, index); + noteManager.moveNote(super.notes.get(index), tarPath, AuthenticationInfo.ANONYMOUS); + } + } + + class ConcurrentTaskMoveFolder extends ConcurrentTask { + public ConcurrentTaskMoveFolder(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) { + super(threadPool, noteNum, notes, pathPattern); + } + + @Override + public void run(int index) throws IOException { + String curPath = "/dev/project_" + index, tarPath = "/staging/note_" + index; + noteManager.moveFolder(curPath, tarPath, AuthenticationInfo.ANONYMOUS); + } + } + + class ConcurrentTaskRemoveNote extends ConcurrentTask { + public ConcurrentTaskRemoveNote(ExecutorService threadPool, int noteNum, Map<Integer, String> notes, String pathPattern) { + super(threadPool, noteNum, notes, pathPattern); + } + + @Override + public void run(int index) throws IOException { + noteManager.removeNote(super.notes.get(index), AuthenticationInfo.ANONYMOUS); + super.notes.remove(index); + } + } }