This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3655c12  [MINOR] Refactor CronJob class (#3335)
3655c12 is described below

commit 3655c12b875884410224eca5d6155287d51916ac
Author: Jongyoul Lee <jongy...@gmail.com>
AuthorDate: Mon Apr 1 15:37:57 2019 +0900

    [MINOR] Refactor CronJob class (#3335)
---
 .../org/apache/zeppelin/rest/NotebookRestApi.java  |  10 +-
 .../org/apache/zeppelin/server/ZeppelinServer.java |   8 ++
 .../apache/zeppelin/service/NotebookService.java   |   8 +-
 .../zeppelin/service/NotebookServiceTest.java      |  31 +++--
 .../apache/zeppelin/socket/NotebookServerTest.java |   9 +-
 .../java/org/apache/zeppelin/notebook/Note.java    |   2 +-
 .../org/apache/zeppelin/notebook/Notebook.java     | 143 +--------------------
 .../zeppelin/notebook/scheduler/CronJob.java       | 102 +++++++++++++++
 .../notebook/scheduler/NoSchedulerService.java     |  33 +++++
 .../notebook/scheduler/QuartzSchedulerService.java | 137 ++++++++++++++++++++
 .../notebook/scheduler/SchedulerService.java       |  25 ++++
 .../org/apache/zeppelin/notebook/NotebookTest.java |  35 ++---
 12 files changed, 369 insertions(+), 174 deletions(-)

diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index 12a25a3..5aa776a 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -48,6 +48,7 @@ import org.apache.zeppelin.notebook.NoteInfo;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.AuthorizationService;
+import org.apache.zeppelin.notebook.scheduler.SchedulerService;
 import org.apache.zeppelin.rest.exception.BadRequestException;
 import org.apache.zeppelin.rest.exception.ForbiddenException;
 import org.apache.zeppelin.rest.exception.NoteNotFoundException;
@@ -88,6 +89,7 @@ public class NotebookRestApi extends AbstractRestApi {
   private NotebookService notebookService;
   private JobManagerService jobManagerService;
   private AuthenticationService authenticationService;
+  private SchedulerService schedulerService;
 
   @Inject
   public NotebookRestApi(
@@ -98,7 +100,8 @@ public class NotebookRestApi extends AbstractRestApi {
       AuthorizationService authorizationService,
       ZeppelinConfiguration zConf,
       AuthenticationService authenticationService,
-      JobManagerService jobManagerService) {
+      JobManagerService jobManagerService,
+      SchedulerService schedulerService) {
     super(authenticationService);
     this.notebook = notebook;
     this.notebookServer = notebookServer;
@@ -108,6 +111,7 @@ public class NotebookRestApi extends AbstractRestApi {
     this.authorizationService = authorizationService;
     this.zConf = zConf;
     this.authenticationService = authenticationService;
+    this.schedulerService = schedulerService;
   }
 
   /**
@@ -880,7 +884,7 @@ public class NotebookRestApi extends AbstractRestApi {
     config.put("cron", request.getCronString());
     config.put("releaseresource", request.getReleaseResource());
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
 
     return new JsonResponse<>(Status.OK).build();
   }
@@ -910,7 +914,7 @@ public class NotebookRestApi extends AbstractRestApi {
     config.remove("cron");
     config.remove("releaseresource");
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
 
     return new JsonResponse<>(Status.OK).build();
   }
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 3a02d40..fe85f75 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -47,6 +47,9 @@ import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.AuthorizationService;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
+import org.apache.zeppelin.notebook.scheduler.NoSchedulerService;
+import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
+import org.apache.zeppelin.notebook.scheduler.SchedulerService;
 import org.apache.zeppelin.rest.exception.WebApplicationExceptionMapper;
 import org.apache.zeppelin.search.LuceneSearch;
 import org.apache.zeppelin.search.SearchService;
@@ -156,6 +159,11 @@ public class ZeppelinServer extends ResourceConfig {
                 .to(NoteEventListener.class)
                 .to(WebSocketServlet.class)
                 .in(Singleton.class);
+            if (conf.isZeppelinNotebookCronEnable()) {
+              
bind(QuartzSchedulerService.class).to(SchedulerService.class).in(Singleton.class);
+            } else {
+              
bind(NoSchedulerService.class).to(SchedulerService.class).in(Singleton.class);
+            }
           }
         });
 
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index 3b4931b..2b829fd 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -46,6 +46,7 @@ import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.AuthorizationService;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
+import org.apache.zeppelin.notebook.scheduler.SchedulerService;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.rest.exception.BadRequestException;
 import org.apache.zeppelin.rest.exception.ForbiddenException;
@@ -79,15 +80,18 @@ public class NotebookService {
   private ZeppelinConfiguration zConf;
   private Notebook notebook;
   private AuthorizationService authorizationService;
+  private SchedulerService schedulerService;
 
   @Inject
   public NotebookService(
       Notebook notebook,
       AuthorizationService authorizationService,
-      ZeppelinConfiguration zeppelinConfiguration) {
+      ZeppelinConfiguration zeppelinConfiguration,
+      SchedulerService schedulerService) {
     this.notebook = notebook;
     this.authorizationService = authorizationService;
     this.zConf = zeppelinConfiguration;
+    this.schedulerService = schedulerService;
   }
 
   public Note getHomeNote(ServiceContext context,
@@ -618,7 +622,7 @@ public class NotebookService {
     note.setName(name);
     note.setConfig(config);
     if (cronUpdated) {
-      notebook.refreshCron(note.getId());
+      schedulerService.refreshCron(note.getId());
     }
 
     notebook.saveNote(note, context.getAutheInfo());
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
index f309cca..f37418e 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java
@@ -33,14 +33,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -51,10 +50,15 @@ import 
org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
-import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.notebook.AuthorizationService;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.NoteInfo;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.repo.InMemoryNotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
+import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
+import org.apache.zeppelin.notebook.scheduler.SchedulerService;
 import org.apache.zeppelin.search.LuceneSearch;
 import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -63,6 +67,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
 public class NotebookServiceTest {
 
   private static NotebookService notebookService;
@@ -83,9 +90,12 @@ public class NotebookServiceTest {
     InterpreterSettingManager mockInterpreterSettingManager = 
mock(InterpreterSettingManager.class);
     InterpreterFactory mockInterpreterFactory = mock(InterpreterFactory.class);
     Interpreter mockInterpreter = mock(Interpreter.class);
-    when(mockInterpreterFactory.getInterpreter(any(), any(), any(), 
any())).thenReturn(mockInterpreter);
-    when(mockInterpreter.interpret(eq("invalid_code"), any())).thenReturn(new 
InterpreterResult(Code.ERROR, "failed"));
-    when(mockInterpreter.interpret(eq("1+1"), any())).thenReturn(new 
InterpreterResult(Code.SUCCESS, "succeed"));
+    when(mockInterpreterFactory.getInterpreter(any(), any(), any(), any()))
+        .thenReturn(mockInterpreter);
+    when(mockInterpreter.interpret(eq("invalid_code"), any()))
+        .thenReturn(new InterpreterResult(Code.ERROR, "failed"));
+    when(mockInterpreter.interpret(eq("1+1"), any()))
+        .thenReturn(new InterpreterResult(Code.SUCCESS, "succeed"));
     doCallRealMethod().when(mockInterpreter).getScheduler();
     when(mockInterpreter.getFormType()).thenReturn(FormType.NATIVE);
     ManagedInterpreterGroup mockInterpreterGroup = 
mock(ManagedInterpreterGroup.class);
@@ -94,7 +104,6 @@ public class NotebookServiceTest {
     when(mockInterpreterSetting.isUserAuthorized(any())).thenReturn(true);
     
when(mockInterpreterGroup.getInterpreterSetting()).thenReturn(mockInterpreterSetting);
     SearchService searchService = new LuceneSearch(zeppelinConfiguration);
-
     Credentials credentials = new Credentials(false, null, null);
     Notebook notebook =
         new Notebook(
@@ -105,8 +114,12 @@ public class NotebookServiceTest {
             searchService,
             credentials,
             null);
-    AuthorizationService authorizationService = new 
AuthorizationService(notebook, notebook.getConf());
-    notebookService = new NotebookService(notebook, authorizationService, 
zeppelinConfiguration);
+    AuthorizationService authorizationService =
+        new AuthorizationService(notebook, notebook.getConf());
+    SchedulerService schedulerService = new 
QuartzSchedulerService(zeppelinConfiguration, notebook);
+    notebookService =
+        new NotebookService(
+            notebook, authorizationService, zeppelinConfiguration, 
schedulerService);
 
     String interpreterName = "test";
     when(mockInterpreterSetting.getName()).thenReturn(interpreterName);
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index e0930b5..bd15bdb 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.thrift.TException;
@@ -56,7 +57,8 @@ import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
-import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
+import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
+import org.apache.zeppelin.notebook.scheduler.SchedulerService;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.rest.AbstractTestRestApi;
@@ -75,6 +77,7 @@ import org.junit.Test;
 public class NotebookServerTest extends AbstractTestRestApi {
   private static Notebook notebook;
   private static NotebookServer notebookServer;
+  private static SchedulerService schedulerService;
   private static NotebookService notebookService;
   private static AuthorizationService authorizationService;
   private HttpServletRequest mockRequest;
@@ -85,10 +88,12 @@ public class NotebookServerTest extends AbstractTestRestApi 
{
     AbstractTestRestApi.startUp(NotebookServerTest.class.getSimpleName());
     notebook = TestUtils.getInstance(Notebook.class);
     authorizationService = new AuthorizationService(notebook, 
notebook.getConf());
+    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+    schedulerService = new QuartzSchedulerService(conf, notebook);
     notebookServer = spy(NotebookServer.getInstance());
     notebookService =
         new NotebookService(
-            notebook, authorizationService, ZeppelinConfiguration.create());
+            notebook, authorizationService, conf, schedulerService);
 
     ConfigurationService configurationService = new 
ConfigurationService(notebook.getConf());
     when(notebookServer.getNotebookService()).thenReturn(notebookService);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 017fc7a..b5fc876 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -843,7 +843,7 @@ public class Note implements JsonSerializable {
   /**
    * Return true if there is a running or pending paragraph
    */
-  boolean haveRunningOrPendingParagraphs() {
+  public boolean haveRunningOrPendingParagraphs() {
     synchronized (paragraphs) {
       for (Paragraph p : paragraphs) {
         Status status = p.getStatus();
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index b542cdc..31d5fdc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -32,13 +32,11 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.inject.Inject;
-import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
@@ -52,16 +50,7 @@ import 
org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision
 import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
-import org.quartz.CronScheduleBuilder;
-import org.quartz.CronTrigger;
-import org.quartz.JobBuilder;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.JobKey;
 import org.quartz.SchedulerException;
-import org.quartz.TriggerBuilder;
-import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,8 +68,6 @@ public class Notebook {
   private InterpreterFactory replFactory;
   private InterpreterSettingManager interpreterSettingManager;
   private ZeppelinConfiguration conf;
-  private StdSchedulerFactory quertzSchedFact;
-  org.quartz.Scheduler quartzSched;
   private ParagraphJobListener paragraphJobListener;
   private NotebookRepo notebookRepo;
   private SearchService noteSearchService;
@@ -100,7 +87,7 @@ public class Notebook {
       InterpreterSettingManager interpreterSettingManager,
       SearchService noteSearchService,
       Credentials credentials)
-      throws IOException, SchedulerException {
+      throws IOException {
     this.noteManager = new NoteManager(notebookRepo);
     this.conf = conf;
     this.notebookRepo = notebookRepo;
@@ -108,10 +95,6 @@ public class Notebook {
     this.interpreterSettingManager = interpreterSettingManager;
     this.noteSearchService = noteSearchService;
     this.credentials = credentials;
-    quertzSchedFact = new org.quartz.impl.StdSchedulerFactory();
-    quartzSched = quertzSchedFact.getScheduler();
-    quartzSched.start();
-    CronJob.notebook = this;
 
     this.noteEventListeners.add(this.noteSearchService);
     this.noteEventListeners.add(this.interpreterSettingManager);
@@ -126,7 +109,7 @@ public class Notebook {
       SearchService noteSearchService,
       Credentials credentials,
       NoteEventListener noteEventListener)
-      throws IOException, SchedulerException {
+      throws IOException {
     this(
         conf,
         notebookRepo,
@@ -556,128 +539,6 @@ public class Notebook {
   }
 
 
-  /**
-   * Cron task for the note.
-   */
-  public static class CronJob implements org.quartz.Job {
-    public static Notebook notebook;
-
-    @Override
-    public void execute(JobExecutionContext context) throws 
JobExecutionException {
-
-      String noteId = 
context.getJobDetail().getJobDataMap().getString("noteId");
-      Note note = notebook.getNote(noteId);
-      if (note.haveRunningOrPendingParagraphs()) {
-        LOGGER.warn("execution of the cron job is skipped because there is a 
running or pending " +
-            "paragraph (note id: {})", noteId);
-        return;
-      }
-
-      if (!note.isCronSupported(notebook.getConf())) {
-        LOGGER.warn("execution of the cron job is skipped cron is not enabled 
from Zeppelin server");
-        return;
-      }
-
-      runAll(note);
-
-      boolean releaseResource = false;
-      String cronExecutingUser = null;
-      try {
-        Map<String, Object> config = note.getConfig();
-        if (config != null) {
-          if (config.containsKey("releaseresource")) {
-            releaseResource = (boolean) config.get("releaseresource");
-          }
-          cronExecutingUser = (String) config.get("cronExecutingUser");
-        }
-      } catch (ClassCastException e) {
-        LOGGER.error(e.getMessage(), e);
-      }
-      if (releaseResource) {
-        for (InterpreterSetting setting : 
notebook.getInterpreterSettingManager()
-            .getInterpreterSettings(note.getId())) {
-          try {
-            notebook.getInterpreterSettingManager().restart(setting.getId(), 
noteId,
-                    cronExecutingUser != null ? cronExecutingUser : 
"anonymous");
-          } catch (InterpreterException e) {
-            LOGGER.error("Fail to restart interpreter: " + setting.getId(), e);
-          }
-        }
-      }
-    }
-
-    void runAll(Note note) {
-      String cronExecutingUser = (String) 
note.getConfig().get("cronExecutingUser");
-      String cronExecutingRoles = (String) 
note.getConfig().get("cronExecutingRoles");
-      if (null == cronExecutingUser) {
-        cronExecutingUser = "anonymous";
-      }
-      AuthenticationInfo authenticationInfo = new AuthenticationInfo(
-          cronExecutingUser,
-          StringUtils.isEmpty(cronExecutingRoles) ? null : cronExecutingRoles,
-          null);
-      note.runAll(authenticationInfo, true);
-    }
-  }
-
-  public void refreshCron(String id) {
-    removeCron(id);
-    Note note = getNote(id);
-    if (note == null || note.isTrash()) {
-      return;
-    }
-    Map<String, Object> config = note.getConfig();
-    if (config == null) {
-      return;
-    }
-
-    if (!note.isCronSupported(getConf())) {
-      LOGGER.warn("execution of the cron job is skipped cron is not enabled 
from Zeppelin server");
-      return;
-    }
-
-    String cronExpr = (String) note.getConfig().get("cron");
-    if (cronExpr == null || cronExpr.trim().length() == 0) {
-      return;
-    }
-
-
-    JobDetail newJob =
-        JobBuilder.newJob(CronJob.class).withIdentity(id, 
"note").usingJobData("noteId", id)
-            .build();
-
-    Map<String, Object> info = note.getInfo();
-    info.put("cron", null);
-
-    CronTrigger trigger = null;
-    try {
-      trigger = TriggerBuilder.newTrigger().withIdentity("trigger_" + id, 
"note")
-          .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)).forJob(id, 
"note").build();
-    } catch (Exception e) {
-      LOGGER.error("Error", e);
-      info.put("cron", e.getMessage());
-    }
-
-
-    try {
-      if (trigger != null) {
-        quartzSched.scheduleJob(newJob, trigger);
-      }
-    } catch (SchedulerException e) {
-      LOGGER.error("Error", e);
-      info.put("cron", "Scheduler Exception");
-    }
-
-  }
-
-  public void removeCron(String id) {
-    try {
-      quartzSched.deleteJob(new JobKey(id, "note"));
-    } catch (SchedulerException e) {
-      LOGGER.error("Can't remove quertz " + id, e);
-    }
-  }
-
   public InterpreterFactory getInterpreterFactory() {
     return replFactory;
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
new file mode 100644
index 0000000..0a7b5da
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.scheduler;
+
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Cron task for the note. */
+public class CronJob implements org.quartz.Job {
+  private static final Logger logger = LoggerFactory.getLogger(CronJob.class);
+
+  @Override
+  public void execute(JobExecutionContext context) {
+    JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
+
+    Notebook notebook = (Notebook) jobDataMap.get("notebook");
+    String noteId = jobDataMap.getString("noteId");
+    Note note = notebook.getNote(noteId);
+    if (note.haveRunningOrPendingParagraphs()) {
+      logger.warn(
+          "execution of the cron job is skipped because there is a running or 
pending "
+              + "paragraph (note id: {})",
+          noteId);
+      return;
+    }
+
+    if (!note.isCronSupported(notebook.getConf())) {
+      logger.warn("execution of the cron job is skipped cron is not enabled 
from Zeppelin server");
+      return;
+    }
+
+    runAll(note);
+
+    boolean releaseResource = false;
+    String cronExecutingUser = null;
+    try {
+      Map<String, Object> config = note.getConfig();
+      if (config != null) {
+        if (config.containsKey("releaseresource")) {
+          releaseResource = (boolean) config.get("releaseresource");
+        }
+        cronExecutingUser = (String) config.get("cronExecutingUser");
+      }
+    } catch (ClassCastException e) {
+      logger.error(e.getMessage(), e);
+    }
+    if (releaseResource) {
+      for (InterpreterSetting setting :
+          
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId())) {
+        try {
+          notebook
+              .getInterpreterSettingManager()
+              .restart(
+                  setting.getId(),
+                  noteId,
+                  cronExecutingUser != null ? cronExecutingUser : "anonymous");
+        } catch (InterpreterException e) {
+          logger.error("Fail to restart interpreter: " + setting.getId(), e);
+        }
+      }
+    }
+  }
+
+  void runAll(Note note) {
+    String cronExecutingUser = (String) 
note.getConfig().get("cronExecutingUser");
+    String cronExecutingRoles = (String) 
note.getConfig().get("cronExecutingRoles");
+    if (null == cronExecutingUser) {
+      cronExecutingUser = "anonymous";
+    }
+    AuthenticationInfo authenticationInfo =
+        new AuthenticationInfo(
+            cronExecutingUser,
+            StringUtils.isEmpty(cronExecutingRoles) ? null : 
cronExecutingRoles,
+            null);
+    note.runAll(authenticationInfo, true);
+  }
+}
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java
new file mode 100644
index 0000000..0263ec2
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.scheduler;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class NoSchedulerService implements SchedulerService {
+  @Override
+  public void refreshCron(String noteId) {
+    // Do nothing
+  }
+
+  @Override
+  public Set<?> getJobs() {
+    return Collections.emptySet();
+  }
+}
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
new file mode 100644
index 0000000..ee174d0
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.scheduler;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuartzSchedulerService implements SchedulerService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QuartzSchedulerService.class);
+
+  private final ZeppelinConfiguration zeppelinConfiguration;
+  private final Notebook notebook;
+  private final Scheduler scheduler;
+
+  @Inject
+  public QuartzSchedulerService(ZeppelinConfiguration zeppelinConfiguration, 
Notebook notebook)
+      throws SchedulerException {
+    this.zeppelinConfiguration = zeppelinConfiguration;
+    this.notebook = notebook;
+    this.scheduler = new StdSchedulerFactory().getScheduler();
+    this.scheduler.start();
+  }
+
+  @Override
+  public void refreshCron(String noteId) {
+    removeCron(noteId);
+    Note note = notebook.getNote(noteId);
+    if (note == null || note.isTrash()) {
+      return;
+    }
+    Map<String, Object> config = note.getConfig();
+    if (config == null) {
+      return;
+    }
+
+    if (!note.isCronSupported(zeppelinConfiguration)) {
+      LOGGER.warn("execution of the cron job is skipped cron is not enabled 
from Zeppelin server");
+      return;
+    }
+
+    String cronExpr = (String) note.getConfig().get("cron");
+    if (cronExpr == null || cronExpr.trim().length() == 0) {
+      return;
+    }
+
+    JobDataMap jobDataMap =
+        new JobDataMap() {
+          {
+            put("noteId", noteId);
+            put("notebook", notebook);
+          }
+        };
+    JobDetail newJob =
+        JobBuilder.newJob(CronJob.class)
+            .withIdentity(noteId, "note")
+            .setJobData(jobDataMap)
+            .build();
+
+    Map<String, Object> info = note.getInfo();
+    info.put("cron", null);
+
+    CronTrigger trigger = null;
+    try {
+      trigger =
+          TriggerBuilder.newTrigger()
+              .withIdentity("trigger_" + noteId, "note")
+              .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr))
+              .forJob(noteId, "note")
+              .build();
+    } catch (Exception e) {
+      LOGGER.error("Error", e);
+      info.put("cron", e.getMessage());
+    }
+
+    try {
+      if (trigger != null) {
+        scheduler.scheduleJob(newJob, trigger);
+      }
+    } catch (SchedulerException e) {
+      LOGGER.error("Error", e);
+      info.put("cron", "Scheduler Exception");
+    }
+  }
+
+  @Override
+  public Set<?> getJobs() {
+    try {
+      return scheduler.getJobKeys(GroupMatcher.anyGroup());
+    } catch (SchedulerException e) {
+      LOGGER.error("Error while getting jobKeys", e);
+      return Collections.emptySet();
+    }
+  }
+
+  private void removeCron(String id) {
+    try {
+      scheduler.deleteJob(new JobKey(id, "note"));
+    } catch (SchedulerException e) {
+      LOGGER.error("Can't remove quertz " + id, e);
+    }
+  }
+}
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java
new file mode 100644
index 0000000..b7c87af
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook.scheduler;
+
+import java.util.Set;
+
+public interface SchedulerService {
+  void refreshCron(String noteId);
+  Set<?> getJobs();
+}
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index cd2fbf3..1629ab2 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -34,6 +34,8 @@ import org.apache.zeppelin.notebook.repo.InMemoryNotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl;
+import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
+import org.apache.zeppelin.notebook.scheduler.SchedulerService;
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
@@ -44,7 +46,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.quartz.SchedulerException;
-import org.quartz.impl.matchers.GroupMatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonatype.aether.RepositoryException;
@@ -82,6 +83,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
   private Credentials credentials;
   private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS;
   private StatusChangedListener afterStatusChangedListener;
+  private SchedulerService schedulerService;
 
   @Before
   public void setUp() throws Exception {
@@ -97,6 +99,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
             credentials, null);
     authorizationService = new AuthorizationService(notebook, 
notebook.getConf());
     notebook.setParagraphJobListener(this);
+    schedulerService = new QuartzSchedulerService(conf, notebook);
 
   }
 
@@ -106,7 +109,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
   }
 
   @Test
-  public void testRevisionSupported() throws IOException, SchedulerException {
+  public void testRevisionSupported() throws IOException {
     NotebookRepo notebookRepo;
     Notebook notebook;
 
@@ -477,13 +480,13 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
     config.put("enabled", true);
     config.put("cron", "* * * * * ?");
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
     Thread.sleep(2 * 1000);
 
     // remove cron scheduler.
     config.put("cron", null);
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
     Thread.sleep(2 * 1000);
     dateFinished = p.getDateFinished();
     assertNotNull(dateFinished);
@@ -511,13 +514,13 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
     config.put("enabled", true);
     config.put("cron", "* * * * * ?");
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
     Thread.sleep(2 * 1000);
 
     // remove cron scheduler.
     config.put("cron", null);
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
     Thread.sleep(2 * 1000);
 
     // check if the executions of the running and pending paragraphs were 
skipped
@@ -559,7 +562,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
     config.put("enabled", true);
     config.put("cron", cron);
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
   }
 
   @Test
@@ -644,7 +647,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
 
   private void terminateScheduledNote(Note note) throws IOException {
     note.getConfig().remove("cron");
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
     notebook.removeNote(note.getId(), anonymous);
   }
 
@@ -669,7 +672,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
     config.put("cron", "1/3 * * * * ?");
     config.put("releaseresource", true);
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
 
 
     RemoteInterpreter mock1 = (RemoteInterpreter) 
interpreterFactory.getInterpreter(anonymous.getUser(), note.getId(), "mock1", 
"test");
@@ -689,7 +692,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
     // remove cron scheduler.
     config.put("cron", null);
     note.setConfig(config);
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
 
     // make sure all paragraph has been executed
     assertNotNull(p.getDateFinished());
@@ -748,7 +751,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
     }
 
     // refresh the cron schedule
-    notebook.refreshCron(cronNote.getId());
+    schedulerService.refreshCron(cronNote.getId());
 
     // wait until cronNoteInterpreter is opened
     while (!cronNoteInterpreter.isOpened()) {
@@ -773,7 +776,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
         put("releaseresource", null);
       }
     });
-    notebook.refreshCron(cronNote.getId());
+    schedulerService.refreshCron(cronNote.getId());
 
     // remove notebooks
     notebook.removeNote(cronNote.getId(), anonymous);
@@ -789,15 +792,15 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
     config.put("cron", "* * * * * ?");
     note.setConfig(config);
 
-    final int jobsBeforeRefresh = 
notebook.quartzSched.getJobKeys(GroupMatcher.anyGroup()).size();
-    notebook.refreshCron(note.getId());
-    final int jobsAfterRefresh = 
notebook.quartzSched.getJobKeys(GroupMatcher.anyGroup()).size();
+    final int jobsBeforeRefresh = schedulerService.getJobs().size();
+    schedulerService.refreshCron(note.getId());
+    final int jobsAfterRefresh = schedulerService.getJobs().size();
 
     assertEquals(jobsBeforeRefresh, jobsAfterRefresh);
 
     // remove cron scheduler.
     config.remove("cron");
-    notebook.refreshCron(note.getId());
+    schedulerService.refreshCron(note.getId());
     notebook.removeNote(note.getId(), anonymous);
   }
 

Reply via email to