This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 3655c12 [MINOR] Refactor CronJob class (#3335) 3655c12 is described below commit 3655c12b875884410224eca5d6155287d51916ac Author: Jongyoul Lee <jongy...@gmail.com> AuthorDate: Mon Apr 1 15:37:57 2019 +0900 [MINOR] Refactor CronJob class (#3335) --- .../org/apache/zeppelin/rest/NotebookRestApi.java | 10 +- .../org/apache/zeppelin/server/ZeppelinServer.java | 8 ++ .../apache/zeppelin/service/NotebookService.java | 8 +- .../zeppelin/service/NotebookServiceTest.java | 31 +++-- .../apache/zeppelin/socket/NotebookServerTest.java | 9 +- .../java/org/apache/zeppelin/notebook/Note.java | 2 +- .../org/apache/zeppelin/notebook/Notebook.java | 143 +-------------------- .../zeppelin/notebook/scheduler/CronJob.java | 102 +++++++++++++++ .../notebook/scheduler/NoSchedulerService.java | 33 +++++ .../notebook/scheduler/QuartzSchedulerService.java | 137 ++++++++++++++++++++ .../notebook/scheduler/SchedulerService.java | 25 ++++ .../org/apache/zeppelin/notebook/NotebookTest.java | 35 ++--- 12 files changed, 369 insertions(+), 174 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 12a25a3..5aa776a 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -48,6 +48,7 @@ import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.AuthorizationService; +import org.apache.zeppelin.notebook.scheduler.SchedulerService; import org.apache.zeppelin.rest.exception.BadRequestException; import org.apache.zeppelin.rest.exception.ForbiddenException; import org.apache.zeppelin.rest.exception.NoteNotFoundException; @@ -88,6 +89,7 @@ public class NotebookRestApi extends AbstractRestApi { private NotebookService notebookService; private JobManagerService jobManagerService; private AuthenticationService authenticationService; + private SchedulerService schedulerService; @Inject public NotebookRestApi( @@ -98,7 +100,8 @@ public class NotebookRestApi extends AbstractRestApi { AuthorizationService authorizationService, ZeppelinConfiguration zConf, AuthenticationService authenticationService, - JobManagerService jobManagerService) { + JobManagerService jobManagerService, + SchedulerService schedulerService) { super(authenticationService); this.notebook = notebook; this.notebookServer = notebookServer; @@ -108,6 +111,7 @@ public class NotebookRestApi extends AbstractRestApi { this.authorizationService = authorizationService; this.zConf = zConf; this.authenticationService = authenticationService; + this.schedulerService = schedulerService; } /** @@ -880,7 +884,7 @@ public class NotebookRestApi extends AbstractRestApi { config.put("cron", request.getCronString()); config.put("releaseresource", request.getReleaseResource()); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); return new JsonResponse<>(Status.OK).build(); } @@ -910,7 +914,7 @@ public class NotebookRestApi extends AbstractRestApi { config.remove("cron"); config.remove("releaseresource"); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); return new JsonResponse<>(Status.OK).build(); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 3a02d40..fe85f75 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -47,6 +47,9 @@ import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.AuthorizationService; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; +import org.apache.zeppelin.notebook.scheduler.NoSchedulerService; +import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; +import org.apache.zeppelin.notebook.scheduler.SchedulerService; import org.apache.zeppelin.rest.exception.WebApplicationExceptionMapper; import org.apache.zeppelin.search.LuceneSearch; import org.apache.zeppelin.search.SearchService; @@ -156,6 +159,11 @@ public class ZeppelinServer extends ResourceConfig { .to(NoteEventListener.class) .to(WebSocketServlet.class) .in(Singleton.class); + if (conf.isZeppelinNotebookCronEnable()) { + bind(QuartzSchedulerService.class).to(SchedulerService.class).in(Singleton.class); + } else { + bind(NoSchedulerService.class).to(SchedulerService.class).in(Singleton.class); + } } }); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java index 3b4931b..2b829fd 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -46,6 +46,7 @@ import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.AuthorizationService; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; +import org.apache.zeppelin.notebook.scheduler.SchedulerService; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.rest.exception.BadRequestException; import org.apache.zeppelin.rest.exception.ForbiddenException; @@ -79,15 +80,18 @@ public class NotebookService { private ZeppelinConfiguration zConf; private Notebook notebook; private AuthorizationService authorizationService; + private SchedulerService schedulerService; @Inject public NotebookService( Notebook notebook, AuthorizationService authorizationService, - ZeppelinConfiguration zeppelinConfiguration) { + ZeppelinConfiguration zeppelinConfiguration, + SchedulerService schedulerService) { this.notebook = notebook; this.authorizationService = authorizationService; this.zConf = zeppelinConfiguration; + this.schedulerService = schedulerService; } public Note getHomeNote(ServiceContext context, @@ -618,7 +622,7 @@ public class NotebookService { note.setName(name); note.setConfig(config); if (cronUpdated) { - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); } notebook.saveNote(note, context.getAutheInfo()); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java index f309cca..f37418e 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/NotebookServiceTest.java @@ -33,14 +33,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; + import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.Interpreter; @@ -51,10 +50,15 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; -import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.notebook.AuthorizationService; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.repo.InMemoryNotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepo; +import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; +import org.apache.zeppelin.notebook.scheduler.SchedulerService; import org.apache.zeppelin.search.LuceneSearch; import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.user.AuthenticationInfo; @@ -63,6 +67,9 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + public class NotebookServiceTest { private static NotebookService notebookService; @@ -83,9 +90,12 @@ public class NotebookServiceTest { InterpreterSettingManager mockInterpreterSettingManager = mock(InterpreterSettingManager.class); InterpreterFactory mockInterpreterFactory = mock(InterpreterFactory.class); Interpreter mockInterpreter = mock(Interpreter.class); - when(mockInterpreterFactory.getInterpreter(any(), any(), any(), any())).thenReturn(mockInterpreter); - when(mockInterpreter.interpret(eq("invalid_code"), any())).thenReturn(new InterpreterResult(Code.ERROR, "failed")); - when(mockInterpreter.interpret(eq("1+1"), any())).thenReturn(new InterpreterResult(Code.SUCCESS, "succeed")); + when(mockInterpreterFactory.getInterpreter(any(), any(), any(), any())) + .thenReturn(mockInterpreter); + when(mockInterpreter.interpret(eq("invalid_code"), any())) + .thenReturn(new InterpreterResult(Code.ERROR, "failed")); + when(mockInterpreter.interpret(eq("1+1"), any())) + .thenReturn(new InterpreterResult(Code.SUCCESS, "succeed")); doCallRealMethod().when(mockInterpreter).getScheduler(); when(mockInterpreter.getFormType()).thenReturn(FormType.NATIVE); ManagedInterpreterGroup mockInterpreterGroup = mock(ManagedInterpreterGroup.class); @@ -94,7 +104,6 @@ public class NotebookServiceTest { when(mockInterpreterSetting.isUserAuthorized(any())).thenReturn(true); when(mockInterpreterGroup.getInterpreterSetting()).thenReturn(mockInterpreterSetting); SearchService searchService = new LuceneSearch(zeppelinConfiguration); - Credentials credentials = new Credentials(false, null, null); Notebook notebook = new Notebook( @@ -105,8 +114,12 @@ public class NotebookServiceTest { searchService, credentials, null); - AuthorizationService authorizationService = new AuthorizationService(notebook, notebook.getConf()); - notebookService = new NotebookService(notebook, authorizationService, zeppelinConfiguration); + AuthorizationService authorizationService = + new AuthorizationService(notebook, notebook.getConf()); + SchedulerService schedulerService = new QuartzSchedulerService(zeppelinConfiguration, notebook); + notebookService = + new NotebookService( + notebook, authorizationService, zeppelinConfiguration, schedulerService); String interpreterName = "test"; when(mockInterpreterSetting.getName()).thenReturn(interpreterName); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index e0930b5..bd15bdb 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; + import javax.servlet.http.HttpServletRequest; import org.apache.thrift.TException; @@ -56,7 +57,8 @@ import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; -import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication; +import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; +import org.apache.zeppelin.notebook.scheduler.SchedulerService; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; import org.apache.zeppelin.rest.AbstractTestRestApi; @@ -75,6 +77,7 @@ import org.junit.Test; public class NotebookServerTest extends AbstractTestRestApi { private static Notebook notebook; private static NotebookServer notebookServer; + private static SchedulerService schedulerService; private static NotebookService notebookService; private static AuthorizationService authorizationService; private HttpServletRequest mockRequest; @@ -85,10 +88,12 @@ public class NotebookServerTest extends AbstractTestRestApi { AbstractTestRestApi.startUp(NotebookServerTest.class.getSimpleName()); notebook = TestUtils.getInstance(Notebook.class); authorizationService = new AuthorizationService(notebook, notebook.getConf()); + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + schedulerService = new QuartzSchedulerService(conf, notebook); notebookServer = spy(NotebookServer.getInstance()); notebookService = new NotebookService( - notebook, authorizationService, ZeppelinConfiguration.create()); + notebook, authorizationService, conf, schedulerService); ConfigurationService configurationService = new ConfigurationService(notebook.getConf()); when(notebookServer.getNotebookService()).thenReturn(notebookService); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 017fc7a..b5fc876 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -843,7 +843,7 @@ public class Note implements JsonSerializable { /** * Return true if there is a running or pending paragraph */ - boolean haveRunningOrPendingParagraphs() { + public boolean haveRunningOrPendingParagraphs() { synchronized (paragraphs) { for (Paragraph p : paragraphs) { Status status = p.getStatus(); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index b542cdc..31d5fdc 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -32,13 +32,11 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import javax.inject.Inject; -import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterNotFoundException; @@ -52,16 +50,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; -import org.quartz.CronScheduleBuilder; -import org.quartz.CronTrigger; -import org.quartz.JobBuilder; -import org.quartz.JobDetail; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.quartz.JobKey; import org.quartz.SchedulerException; -import org.quartz.TriggerBuilder; -import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,8 +68,6 @@ public class Notebook { private InterpreterFactory replFactory; private InterpreterSettingManager interpreterSettingManager; private ZeppelinConfiguration conf; - private StdSchedulerFactory quertzSchedFact; - org.quartz.Scheduler quartzSched; private ParagraphJobListener paragraphJobListener; private NotebookRepo notebookRepo; private SearchService noteSearchService; @@ -100,7 +87,7 @@ public class Notebook { InterpreterSettingManager interpreterSettingManager, SearchService noteSearchService, Credentials credentials) - throws IOException, SchedulerException { + throws IOException { this.noteManager = new NoteManager(notebookRepo); this.conf = conf; this.notebookRepo = notebookRepo; @@ -108,10 +95,6 @@ public class Notebook { this.interpreterSettingManager = interpreterSettingManager; this.noteSearchService = noteSearchService; this.credentials = credentials; - quertzSchedFact = new org.quartz.impl.StdSchedulerFactory(); - quartzSched = quertzSchedFact.getScheduler(); - quartzSched.start(); - CronJob.notebook = this; this.noteEventListeners.add(this.noteSearchService); this.noteEventListeners.add(this.interpreterSettingManager); @@ -126,7 +109,7 @@ public class Notebook { SearchService noteSearchService, Credentials credentials, NoteEventListener noteEventListener) - throws IOException, SchedulerException { + throws IOException { this( conf, notebookRepo, @@ -556,128 +539,6 @@ public class Notebook { } - /** - * Cron task for the note. - */ - public static class CronJob implements org.quartz.Job { - public static Notebook notebook; - - @Override - public void execute(JobExecutionContext context) throws JobExecutionException { - - String noteId = context.getJobDetail().getJobDataMap().getString("noteId"); - Note note = notebook.getNote(noteId); - if (note.haveRunningOrPendingParagraphs()) { - LOGGER.warn("execution of the cron job is skipped because there is a running or pending " + - "paragraph (note id: {})", noteId); - return; - } - - if (!note.isCronSupported(notebook.getConf())) { - LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); - return; - } - - runAll(note); - - boolean releaseResource = false; - String cronExecutingUser = null; - try { - Map<String, Object> config = note.getConfig(); - if (config != null) { - if (config.containsKey("releaseresource")) { - releaseResource = (boolean) config.get("releaseresource"); - } - cronExecutingUser = (String) config.get("cronExecutingUser"); - } - } catch (ClassCastException e) { - LOGGER.error(e.getMessage(), e); - } - if (releaseResource) { - for (InterpreterSetting setting : notebook.getInterpreterSettingManager() - .getInterpreterSettings(note.getId())) { - try { - notebook.getInterpreterSettingManager().restart(setting.getId(), noteId, - cronExecutingUser != null ? cronExecutingUser : "anonymous"); - } catch (InterpreterException e) { - LOGGER.error("Fail to restart interpreter: " + setting.getId(), e); - } - } - } - } - - void runAll(Note note) { - String cronExecutingUser = (String) note.getConfig().get("cronExecutingUser"); - String cronExecutingRoles = (String) note.getConfig().get("cronExecutingRoles"); - if (null == cronExecutingUser) { - cronExecutingUser = "anonymous"; - } - AuthenticationInfo authenticationInfo = new AuthenticationInfo( - cronExecutingUser, - StringUtils.isEmpty(cronExecutingRoles) ? null : cronExecutingRoles, - null); - note.runAll(authenticationInfo, true); - } - } - - public void refreshCron(String id) { - removeCron(id); - Note note = getNote(id); - if (note == null || note.isTrash()) { - return; - } - Map<String, Object> config = note.getConfig(); - if (config == null) { - return; - } - - if (!note.isCronSupported(getConf())) { - LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); - return; - } - - String cronExpr = (String) note.getConfig().get("cron"); - if (cronExpr == null || cronExpr.trim().length() == 0) { - return; - } - - - JobDetail newJob = - JobBuilder.newJob(CronJob.class).withIdentity(id, "note").usingJobData("noteId", id) - .build(); - - Map<String, Object> info = note.getInfo(); - info.put("cron", null); - - CronTrigger trigger = null; - try { - trigger = TriggerBuilder.newTrigger().withIdentity("trigger_" + id, "note") - .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)).forJob(id, "note").build(); - } catch (Exception e) { - LOGGER.error("Error", e); - info.put("cron", e.getMessage()); - } - - - try { - if (trigger != null) { - quartzSched.scheduleJob(newJob, trigger); - } - } catch (SchedulerException e) { - LOGGER.error("Error", e); - info.put("cron", "Scheduler Exception"); - } - - } - - public void removeCron(String id) { - try { - quartzSched.deleteJob(new JobKey(id, "note")); - } catch (SchedulerException e) { - LOGGER.error("Can't remove quertz " + id, e); - } - } - public InterpreterFactory getInterpreterFactory() { return replFactory; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java new file mode 100644 index 0000000..0a7b5da --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/CronJob.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.scheduler; + +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Cron task for the note. */ +public class CronJob implements org.quartz.Job { + private static final Logger logger = LoggerFactory.getLogger(CronJob.class); + + @Override + public void execute(JobExecutionContext context) { + JobDataMap jobDataMap = context.getJobDetail().getJobDataMap(); + + Notebook notebook = (Notebook) jobDataMap.get("notebook"); + String noteId = jobDataMap.getString("noteId"); + Note note = notebook.getNote(noteId); + if (note.haveRunningOrPendingParagraphs()) { + logger.warn( + "execution of the cron job is skipped because there is a running or pending " + + "paragraph (note id: {})", + noteId); + return; + } + + if (!note.isCronSupported(notebook.getConf())) { + logger.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); + return; + } + + runAll(note); + + boolean releaseResource = false; + String cronExecutingUser = null; + try { + Map<String, Object> config = note.getConfig(); + if (config != null) { + if (config.containsKey("releaseresource")) { + releaseResource = (boolean) config.get("releaseresource"); + } + cronExecutingUser = (String) config.get("cronExecutingUser"); + } + } catch (ClassCastException e) { + logger.error(e.getMessage(), e); + } + if (releaseResource) { + for (InterpreterSetting setting : + notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId())) { + try { + notebook + .getInterpreterSettingManager() + .restart( + setting.getId(), + noteId, + cronExecutingUser != null ? cronExecutingUser : "anonymous"); + } catch (InterpreterException e) { + logger.error("Fail to restart interpreter: " + setting.getId(), e); + } + } + } + } + + void runAll(Note note) { + String cronExecutingUser = (String) note.getConfig().get("cronExecutingUser"); + String cronExecutingRoles = (String) note.getConfig().get("cronExecutingRoles"); + if (null == cronExecutingUser) { + cronExecutingUser = "anonymous"; + } + AuthenticationInfo authenticationInfo = + new AuthenticationInfo( + cronExecutingUser, + StringUtils.isEmpty(cronExecutingRoles) ? null : cronExecutingRoles, + null); + note.runAll(authenticationInfo, true); + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java new file mode 100644 index 0000000..0263ec2 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/NoSchedulerService.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.scheduler; + +import java.util.Collections; +import java.util.Set; + +public class NoSchedulerService implements SchedulerService { + @Override + public void refreshCron(String noteId) { + // Do nothing + } + + @Override + public Set<?> getJobs() { + return Collections.emptySet(); + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java new file mode 100644 index 0000000..ee174d0 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/QuartzSchedulerService.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.scheduler; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import javax.inject.Inject; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.quartz.CronScheduleBuilder; +import org.quartz.CronTrigger; +import org.quartz.JobBuilder; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.TriggerBuilder; +import org.quartz.impl.StdSchedulerFactory; +import org.quartz.impl.matchers.GroupMatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuartzSchedulerService implements SchedulerService { + private static final Logger LOGGER = LoggerFactory.getLogger(QuartzSchedulerService.class); + + private final ZeppelinConfiguration zeppelinConfiguration; + private final Notebook notebook; + private final Scheduler scheduler; + + @Inject + public QuartzSchedulerService(ZeppelinConfiguration zeppelinConfiguration, Notebook notebook) + throws SchedulerException { + this.zeppelinConfiguration = zeppelinConfiguration; + this.notebook = notebook; + this.scheduler = new StdSchedulerFactory().getScheduler(); + this.scheduler.start(); + } + + @Override + public void refreshCron(String noteId) { + removeCron(noteId); + Note note = notebook.getNote(noteId); + if (note == null || note.isTrash()) { + return; + } + Map<String, Object> config = note.getConfig(); + if (config == null) { + return; + } + + if (!note.isCronSupported(zeppelinConfiguration)) { + LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); + return; + } + + String cronExpr = (String) note.getConfig().get("cron"); + if (cronExpr == null || cronExpr.trim().length() == 0) { + return; + } + + JobDataMap jobDataMap = + new JobDataMap() { + { + put("noteId", noteId); + put("notebook", notebook); + } + }; + JobDetail newJob = + JobBuilder.newJob(CronJob.class) + .withIdentity(noteId, "note") + .setJobData(jobDataMap) + .build(); + + Map<String, Object> info = note.getInfo(); + info.put("cron", null); + + CronTrigger trigger = null; + try { + trigger = + TriggerBuilder.newTrigger() + .withIdentity("trigger_" + noteId, "note") + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)) + .forJob(noteId, "note") + .build(); + } catch (Exception e) { + LOGGER.error("Error", e); + info.put("cron", e.getMessage()); + } + + try { + if (trigger != null) { + scheduler.scheduleJob(newJob, trigger); + } + } catch (SchedulerException e) { + LOGGER.error("Error", e); + info.put("cron", "Scheduler Exception"); + } + } + + @Override + public Set<?> getJobs() { + try { + return scheduler.getJobKeys(GroupMatcher.anyGroup()); + } catch (SchedulerException e) { + LOGGER.error("Error while getting jobKeys", e); + return Collections.emptySet(); + } + } + + private void removeCron(String id) { + try { + scheduler.deleteJob(new JobKey(id, "note")); + } catch (SchedulerException e) { + LOGGER.error("Can't remove quertz " + id, e); + } + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java new file mode 100644 index 0000000..b7c87af --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/scheduler/SchedulerService.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.scheduler; + +import java.util.Set; + +public interface SchedulerService { + void refreshCron(String noteId); + Set<?> getJobs(); +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index cd2fbf3..1629ab2 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -34,6 +34,8 @@ import org.apache.zeppelin.notebook.repo.InMemoryNotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; +import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; +import org.apache.zeppelin.notebook.scheduler.SchedulerService; import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; @@ -44,7 +46,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.quartz.SchedulerException; -import org.quartz.impl.matchers.GroupMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonatype.aether.RepositoryException; @@ -82,6 +83,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo private Credentials credentials; private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS; private StatusChangedListener afterStatusChangedListener; + private SchedulerService schedulerService; @Before public void setUp() throws Exception { @@ -97,6 +99,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo credentials, null); authorizationService = new AuthorizationService(notebook, notebook.getConf()); notebook.setParagraphJobListener(this); + schedulerService = new QuartzSchedulerService(conf, notebook); } @@ -106,7 +109,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo } @Test - public void testRevisionSupported() throws IOException, SchedulerException { + public void testRevisionSupported() throws IOException { NotebookRepo notebookRepo; Notebook notebook; @@ -477,13 +480,13 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo config.put("enabled", true); config.put("cron", "* * * * * ?"); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); Thread.sleep(2 * 1000); // remove cron scheduler. config.put("cron", null); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); Thread.sleep(2 * 1000); dateFinished = p.getDateFinished(); assertNotNull(dateFinished); @@ -511,13 +514,13 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo config.put("enabled", true); config.put("cron", "* * * * * ?"); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); Thread.sleep(2 * 1000); // remove cron scheduler. config.put("cron", null); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); Thread.sleep(2 * 1000); // check if the executions of the running and pending paragraphs were skipped @@ -559,7 +562,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo config.put("enabled", true); config.put("cron", cron); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); } @Test @@ -644,7 +647,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo private void terminateScheduledNote(Note note) throws IOException { note.getConfig().remove("cron"); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); notebook.removeNote(note.getId(), anonymous); } @@ -669,7 +672,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo config.put("cron", "1/3 * * * * ?"); config.put("releaseresource", true); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); RemoteInterpreter mock1 = (RemoteInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), note.getId(), "mock1", "test"); @@ -689,7 +692,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo // remove cron scheduler. config.put("cron", null); note.setConfig(config); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); // make sure all paragraph has been executed assertNotNull(p.getDateFinished()); @@ -748,7 +751,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo } // refresh the cron schedule - notebook.refreshCron(cronNote.getId()); + schedulerService.refreshCron(cronNote.getId()); // wait until cronNoteInterpreter is opened while (!cronNoteInterpreter.isOpened()) { @@ -773,7 +776,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo put("releaseresource", null); } }); - notebook.refreshCron(cronNote.getId()); + schedulerService.refreshCron(cronNote.getId()); // remove notebooks notebook.removeNote(cronNote.getId(), anonymous); @@ -789,15 +792,15 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo config.put("cron", "* * * * * ?"); note.setConfig(config); - final int jobsBeforeRefresh = notebook.quartzSched.getJobKeys(GroupMatcher.anyGroup()).size(); - notebook.refreshCron(note.getId()); - final int jobsAfterRefresh = notebook.quartzSched.getJobKeys(GroupMatcher.anyGroup()).size(); + final int jobsBeforeRefresh = schedulerService.getJobs().size(); + schedulerService.refreshCron(note.getId()); + final int jobsAfterRefresh = schedulerService.getJobs().size(); assertEquals(jobsBeforeRefresh, jobsAfterRefresh); // remove cron scheduler. config.remove("cron"); - notebook.refreshCron(note.getId()); + schedulerService.refreshCron(note.getId()); notebook.removeNote(note.getId(), anonymous); }