gongyining created ZEPPELIN-5510:
------------------------------------
Summary: There are some logic problems, which may lead to thread
leaks
Key: ZEPPELIN-5510
URL: https://issues.apache.org/jira/browse/ZEPPELIN-5510
Project: Zeppelin
Issue Type: Improvement
Components: Interpreters
Affects Versions: 0.10.0
Reporter: gongyining
{code:java}
public void addJob(InterpreterContext context, JobClient jobClient) {
String paragraphId = context.getParagraphId();
JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
long checkInterval =
Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval",
"1000"));
FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl,
jobClient.getJobID(), context, checkInterval);
thread.setName("JobProgressPoller-Thread-" + paragraphId);
thread.start();
this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
if (previousJobClient != null) {
LOGGER.warn("There's another Job {} that is associated with paragraph {}",
jobClient.getJobID(), paragraphId);
}
}
{code}
There are some problems with this code.It may cause thread leak.I think it
shoud be changed to this
{code:java}
public void addJob(InterpreterContext context, JobClient jobClient) {
String paragraphId = context.getParagraphId();
JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
if (previousJobClient != null) {
LOGGER.warn("There's another Job {} that is associated with paragraph {}",
jobClient.getJobID(), paragraphId);
return;
}
long checkInterval =
Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval",
"1000"));
FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl,
jobClient.getJobID(), context, checkInterval);
thread.setName("JobProgressPoller-Thread-" + paragraphId);
thread.start();
this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
}
{code}
If previousJobClient is not null.We shouldn't start threading again.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)