This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 2fe4c58 [ZEPPELIN-5147]. ConcurrentModificationException in
Note#toJson
2fe4c58 is described below
commit 2fe4c58e1bc287adc908fd24ba24e503f8e1a9f7
Author: Jeff Zhang <[email protected]>
AuthorDate: Mon Nov 30 23:54:22 2020 +0800
[ZEPPELIN-5147]. ConcurrentModificationException in Note#toJson
### What is this PR for?
The root cause is that when calling Note#toJson, note may be in change
(adding/removing paragraph). In this PR, I change Note#paragraphs to
CopyOnWriteArrayList, because adding/removing operation for paragraph is not a
frequent operation, so it is ok for use CopyOnWriteArrayList here.
Besides that, this PR also do some improvement on other parts, such as
adding more logging.
### What type of PR is it?
[Bug Fix | Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5147
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <[email protected]>
Closes #3985 from zjffdu/ZEPPELIN-5147 and squashes the following commits:
1bc9bbccd [Jeff Zhang] [ZEPPELIN-5147]. ConcurrentModificationException in
Note#toJson
---
.../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 4 +-
.../java/org/apache/zeppelin/client/ZSession.java | 5 +
.../zeppelin/scheduler/SchedulerFactory.java | 5 +-
.../org/apache/zeppelin/rest/NotebookRestApi.java | 3 -
.../exception/WebApplicationExceptionMapper.java | 7 +-
.../apache/zeppelin/service/NotebookService.java | 28 ++--
.../java/org/apache/zeppelin/notebook/Note.java | 172 +++++++++------------
7 files changed, 107 insertions(+), 117 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 916fe5e..837de10 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -779,7 +779,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
int updateCount = statement.getUpdateCount();
context.out.write("\n%text " +
"Query executed successfully. Affected rows : " +
- updateCount);
+ updateCount + "\n");
}
} finally {
if (resultSet != null) {
@@ -1028,6 +1028,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
try {
return Integer.valueOf(getProperty(CONCURRENT_EXECUTION_COUNT));
} catch (Exception e) {
+ LOGGER.error("Fail to parse {} with value: {}",
CONCURRENT_EXECUTION_COUNT,
+ getProperty(CONCURRENT_EXECUTION_COUNT));
return 10;
}
}
diff --git
a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
index 45bef26..f411ed6 100644
--- a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
@@ -478,6 +478,11 @@ public class ZSession {
return this;
}
+ public Builder setMaxStatement(int maxStatement) {
+ this.maxStatement = maxStatement;
+ return this;
+ }
+
public ZSession build() throws Exception {
return new ZSession(clientConfig, interpreter, intpProperties,
maxStatement);
}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 8e76c0f..2405780 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -52,7 +52,7 @@ public class SchedulerFactory {
private SchedulerFactory() {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
int threadPoolSize =
-
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
+
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
executor =
ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME,
threadPoolSize);
}
@@ -73,6 +73,7 @@ public class SchedulerFactory {
public Scheduler createOrGetFIFOScheduler(String name) {
synchronized (schedulers) {
if (!schedulers.containsKey(name)) {
+ LOGGER.info("Create FIFOScheduler: {}", name);
FIFOScheduler s = new FIFOScheduler(name);
schedulers.put(name, s);
executor.execute(s);
@@ -84,6 +85,7 @@ public class SchedulerFactory {
public Scheduler createOrGetParallelScheduler(String name, int
maxConcurrency) {
synchronized (schedulers) {
if (!schedulers.containsKey(name)) {
+ LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}",
name, maxConcurrency);
ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
schedulers.put(name, s);
executor.execute(s);
@@ -105,6 +107,7 @@ public class SchedulerFactory {
public void removeScheduler(String name) {
synchronized (schedulers) {
+ LOGGER.info("Remove scheduler: {}", name);
Scheduler s = schedulers.remove(name);
if (s != null) {
s.stop();
diff --git
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index b241138..82c77ed 100644
---
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -541,9 +541,6 @@ public class NotebookRestApi extends AbstractRestApi {
@ZeppelinApi
public Response getParagraph(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId)
throws IOException {
-
- LOGGER.info("Get paragraph {} {}", noteId, paragraphId);
-
Note note = notebook.getNote(noteId);
checkIfNoteIsNotNull(note, noteId);
checkIfUserCanRead(noteId, "Insufficient privileges you cannot get this
paragraph");
diff --git
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
index 5615f87..3a1cb1c 100644
---
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
+++
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
@@ -25,15 +25,19 @@ import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.apache.zeppelin.rest.message.gson.ExceptionSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Provider
public class WebApplicationExceptionMapper implements
ExceptionMapper<Throwable> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WebApplicationException.class);
+
private final Gson gson;
public WebApplicationExceptionMapper() {
GsonBuilder gsonBuilder = new
GsonBuilder().enableComplexMapKeySerialization();
gsonBuilder.registerTypeHierarchyAdapter(
- Exception.class, new ExceptionSerializer());
+ Exception.class, new ExceptionSerializer());
this.gson = gsonBuilder.create();
}
@@ -42,6 +46,7 @@ public class WebApplicationExceptionMapper implements
ExceptionMapper<Throwable>
if (exception instanceof WebApplicationException) {
return ((WebApplicationException) exception).getResponse();
} else {
+ LOGGER.error("Error response", exception);
return Response.status(500).entity(gson.toJson(exception)).build();
}
}
diff --git
a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index 4a7db3f..c6f4124 100644
---
a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++
b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -650,21 +650,23 @@ public class NotebookService {
callback.onFailure(new NoteNotFoundException(noteId), context);
throw new IOException("No such note");
}
- if (note.getParagraphCount() < maxParagraph) {
- return note.addNewParagraph(context.getAutheInfo());
- } else {
- boolean removed = false;
- for (int i = 1; i< note.getParagraphCount(); ++i) {
- if (note.getParagraph(i).getStatus().isCompleted()) {
- note.removeParagraph(context.getAutheInfo().getUser(),
note.getParagraph(i).getId());
- removed = true;
- break;
+ synchronized (this) {
+ if (note.getParagraphCount() < maxParagraph) {
+ return note.addNewParagraph(context.getAutheInfo());
+ } else {
+ boolean removed = false;
+ for (int i = 1; i < note.getParagraphCount(); ++i) {
+ if (note.getParagraph(i).getStatus().isCompleted()) {
+ note.removeParagraph(context.getAutheInfo().getUser(),
note.getParagraph(i).getId());
+ removed = true;
+ break;
+ }
}
+ if (!removed) {
+ throw new IOException("All the paragraphs are not completed, unable
to find available paragraph");
+ }
+ return note.addNewParagraph(context.getAutheInfo());
}
- if (!removed) {
- throw new IOException("All the paragraphs are not completed, unable to
find available paragraph");
- }
- return note.addNewParagraph(context.getAutheInfo());
}
}
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 08bad00..cae5c3f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -65,6 +65,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
* Represent the note of Zeppelin. All the note and its paragraph operations
are done
@@ -86,16 +87,16 @@ public class Note implements JsonSerializable {
}
};
private static final Gson GSON = new GsonBuilder()
- .setPrettyPrinting()
- .setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
- .registerTypeAdapter(Date.class, new NotebookImportDeserializer())
- .registerTypeAdapterFactory(Input.TypeAdapterFactory)
- .setExclusionStrategies(strategy)
- .create();
+ .setPrettyPrinting()
+ .setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+ .registerTypeAdapter(Date.class, new NotebookImportDeserializer())
+ .registerTypeAdapterFactory(Input.TypeAdapterFactory)
+ .setExclusionStrategies(strategy)
+ .create();
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss");
- private List<Paragraph> paragraphs = new LinkedList<>();
+ private CopyOnWriteArrayList<Paragraph> paragraphs = new
CopyOnWriteArrayList<>();
private String name = "";
private String id;
private String defaultInterpreterGroup;
@@ -137,8 +138,8 @@ public class Note implements JsonSerializable {
}
public Note(String path, String defaultInterpreterGroup, InterpreterFactory
factory,
- InterpreterSettingManager interpreterSettingManager,
ParagraphJobListener paragraphJobListener,
- Credentials credentials, List<NoteEventListener> noteEventListener) {
+ InterpreterSettingManager interpreterSettingManager,
ParagraphJobListener paragraphJobListener,
+ Credentials credentials, List<NoteEventListener>
noteEventListener) {
setPath(path);
this.defaultInterpreterGroup = defaultInterpreterGroup;
this.interpreterFactory = factory;
@@ -268,7 +269,7 @@ public class Note implements JsonSerializable {
public String getDefaultInterpreterGroup() {
if (StringUtils.isBlank(defaultInterpreterGroup)) {
defaultInterpreterGroup = ZeppelinConfiguration.create()
-
.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT);
+
.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT);
}
return defaultInterpreterGroup;
}
@@ -399,8 +400,8 @@ public class Note implements JsonSerializable {
continue;
}
if (StringUtils.equals(noteId, angularObject.getNoteId())
- && StringUtils.equals(paragraphId, angularObject.getParagraphId())
- && StringUtils.equals(name, angularObject.getName())) {
+ && StringUtils.equals(paragraphId,
angularObject.getParagraphId())
+ && StringUtils.equals(name, angularObject.getName())) {
iter.remove();
}
}
@@ -438,8 +439,8 @@ public class Note implements JsonSerializable {
continue;
}
if (StringUtils.equals(noteId, noteIdCandidate)
- && StringUtils.equals(paragraphId, paragraphIdCandidate)
- && StringUtils.equals(name, nameCandidate)) {
+ && StringUtils.equals(paragraphId, paragraphIdCandidate)
+ && StringUtils.equals(name, nameCandidate)) {
iter.remove();
}
}
@@ -487,9 +488,7 @@ public class Note implements JsonSerializable {
LOGGER.warn("Paragraph {} has a result with exception. {}",
srcParagraph.getId(), e.getMessage());
}
- synchronized (paragraphs) {
- paragraphs.add(newParagraph);
- }
+ paragraphs.add(newParagraph);
try {
fireParagraphCreateEvent(newParagraph);
@@ -529,7 +528,7 @@ public class Note implements JsonSerializable {
// Set the default parameter configuration for the paragraph
// based on `interpreter-setting.json` config
Map<String, Object> config =
- interpreterSettingManager.getConfigSetting(defaultInterpreterGroup);
+
interpreterSettingManager.getConfigSetting(defaultInterpreterGroup);
paragraph.setConfig(config);
}
paragraph.setAuthenticationInfo(authenticationInfo);
@@ -543,9 +542,7 @@ public class Note implements JsonSerializable {
}
private void insertParagraph(Paragraph paragraph, int index) {
- synchronized (paragraphs) {
- paragraphs.add(index, paragraph);
- }
+ paragraphs.add(index, paragraph);
try {
fireParagraphCreateEvent(paragraph);
} catch (IOException e) {
@@ -562,19 +559,15 @@ public class Note implements JsonSerializable {
public Paragraph removeParagraph(String user, String paragraphId) {
removeAllAngularObjectInParagraph(user, paragraphId);
interpreterSettingManager.removeResourcesBelongsToParagraph(getId(),
paragraphId);
- synchronized (paragraphs) {
- Iterator<Paragraph> i = paragraphs.iterator();
- while (i.hasNext()) {
- Paragraph p = i.next();
- if (p.getId().equals(paragraphId)) {
- i.remove();
- try {
- fireParagraphRemoveEvent(p);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return p;
+ for (Paragraph p : paragraphs) {
+ if (p.getId().equals(paragraphId)) {
+ paragraphs.remove(p);
+ try {
+ fireParagraphRemoveEvent(p);
+ } catch (IOException e) {
+ LOGGER.error("Fail to fire ParagraphRemoveEvent", e);
}
+ return p;
}
}
return null;
@@ -587,16 +580,14 @@ public class Note implements JsonSerializable {
}
public Paragraph clearPersonalizedParagraphOutput(String paragraphId, String
user) {
- synchronized (paragraphs) {
- for (Paragraph p : paragraphs) {
- if (!p.getId().equals(paragraphId)) {
- continue;
- }
-
- p = p.getUserParagraphMap().get(user);
- clearParagraphOutputFields(p);
- return p;
+ for (Paragraph p : paragraphs) {
+ if (!p.getId().equals(paragraphId)) {
+ continue;
}
+
+ p = p.getUserParagraphMap().get(user);
+ clearParagraphOutputFields(p);
+ return p;
}
return null;
}
@@ -608,15 +599,13 @@ public class Note implements JsonSerializable {
* @return Paragraph
*/
public Paragraph clearParagraphOutput(String paragraphId) {
- synchronized (paragraphs) {
- for (Paragraph p : paragraphs) {
- if (!p.getId().equals(paragraphId)) {
- continue;
- }
-
- clearParagraphOutputFields(p);
- return p;
+ for (Paragraph p : paragraphs) {
+ if (!p.getId().equals(paragraphId)) {
+ continue;
}
+
+ clearParagraphOutputFields(p);
+ return p;
}
return null;
}
@@ -625,10 +614,8 @@ public class Note implements JsonSerializable {
* Clear all paragraph output of note
*/
public void clearAllParagraphOutput() {
- synchronized (paragraphs) {
- for (Paragraph p : paragraphs) {
- p.setReturn(null, null);
- }
+ for (Paragraph p : paragraphs) {
+ p.setReturn(null, null);
}
}
@@ -651,41 +638,37 @@ public class Note implements JsonSerializable {
* when index is out of bound
*/
public void moveParagraph(String paragraphId, int index, boolean
throwWhenIndexIsOutOfBound) {
- synchronized (paragraphs) {
- int oldIndex;
- Paragraph p = null;
-
- if (index < 0 || index >= paragraphs.size()) {
- if (throwWhenIndexIsOutOfBound) {
- throw new IndexOutOfBoundsException(
- "paragraph size is " + paragraphs.size() + " , index is " +
index);
- } else {
- return;
- }
+ int oldIndex;
+ Paragraph p = null;
+
+ if (index < 0 || index >= paragraphs.size()) {
+ if (throwWhenIndexIsOutOfBound) {
+ throw new IndexOutOfBoundsException(
+ "paragraph size is " + paragraphs.size() + " , index is " +
index);
+ } else {
+ return;
}
+ }
- for (int i = 0; i < paragraphs.size(); i++) {
- if (paragraphs.get(i).getId().equals(paragraphId)) {
- oldIndex = i;
- if (oldIndex == index) {
- return;
- }
- p = paragraphs.remove(i);
+ for (int i = 0; i < paragraphs.size(); i++) {
+ if (paragraphs.get(i).getId().equals(paragraphId)) {
+ oldIndex = i;
+ if (oldIndex == index) {
+ return;
}
+ p = paragraphs.remove(i);
}
+ }
- if (p != null) {
- paragraphs.add(index, p);
- }
+ if (p != null) {
+ paragraphs.add(index, p);
}
}
public boolean isLastParagraph(String paragraphId) {
if (!paragraphs.isEmpty()) {
- synchronized (paragraphs) {
- if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId()))
{
- return true;
- }
+ if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId())) {
+ return true;
}
return false;
}
@@ -698,11 +681,9 @@ public class Note implements JsonSerializable {
}
public Paragraph getParagraph(String paragraphId) {
- synchronized (paragraphs) {
- for (Paragraph p : paragraphs) {
- if (p.getId().equals(paragraphId)) {
- return p;
- }
+ for (Paragraph p : paragraphs) {
+ if (p.getId().equals(paragraphId)) {
+ return p;
}
}
return null;
@@ -713,9 +694,7 @@ public class Note implements JsonSerializable {
}
public Paragraph getLastParagraph() {
- synchronized (paragraphs) {
- return paragraphs.get(paragraphs.size() - 1);
- }
+ return paragraphs.get(paragraphs.size() - 1);
}
private void setParagraphMagic(Paragraph p, int index) {
@@ -876,15 +855,12 @@ public class Note implements JsonSerializable {
* Return true if there is a running or pending paragraph
*/
public boolean haveRunningOrPendingParagraphs() {
- synchronized (paragraphs) {
- for (Paragraph p : paragraphs) {
- Status status = p.getStatus();
- if (status.isRunning() || status.isPending()) {
- return true;
- }
+ for (Paragraph p : paragraphs) {
+ Status status = p.getStatus();
+ if (status.isRunning() || status.isPending()) {
+ return true;
}
}
-
return false;
}
@@ -902,8 +878,8 @@ public class Note implements JsonSerializable {
return p.completion(buffer, cursor);
}
- public List<Paragraph> getParagraphs() {
- return new ArrayList<>(this.paragraphs);
+ public CopyOnWriteArrayList<Paragraph> getParagraphs() {
+ return this.paragraphs;
}
// TODO(zjffdu) how does this used ?
@@ -948,7 +924,7 @@ public class Note implements JsonSerializable {
if (appStates != null) {
for (ApplicationState app : appStates) {
((RemoteAngularObjectRegistry) registry)
- .removeAllAndNotifyRemoteProcess(id, app.getId());
+ .removeAllAndNotifyRemoteProcess(id, app.getId());
}
}
} else {
@@ -1190,7 +1166,7 @@ public class Note implements JsonSerializable {
return false;
}
if (angularObjects != null ?
- !angularObjects.equals(note.angularObjects) : note.angularObjects !=
null) {
+ !angularObjects.equals(note.angularObjects) : note.angularObjects
!= null) {
return false;
}
if (config != null ? !config.equals(note.config) : note.config != null) {