This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
new 90a33e0 [cherry-pick-2.0.3] cache manager (#7882)
90a33e0 is described below
commit 90a33e06eb500e148997678b95bf469e446366c8
Author: wind <[email protected]>
AuthorDate: Wed Jan 12 11:59:51 2022 +0800
[cherry-pick-2.0.3] cache manager (#7882)
* cherry-pick [DS-6963][MasterServer]add cache manager for tenant and user
* [cherry-pick][Feature-6988][MasterServer] add cache manager for workflow
* [Feature][MasterServer] add cache for workGroup and schedule #6987 (#7161)
* to #6987
* to #6987
* to #6987
* to #6987
* to #6987
* to #6987: @Param -> @Name
* to #6987: fix Sonar
* to #6987: fix Sonar
Co-authored-by: honghuo.zw <[email protected]>
* [cherry-pick][Feature-6963][MasterServer] unified cache manager
* [cherry-pick][Bug-7292][ApiServer] fix cache error when standalone
* [cherry-pick][Fix-7245] [API] Fix spelEvaluationException: EL1008E:
Property or field 'all' cannot be found
* bugfix
Co-authored-by: zwZjut <[email protected]>
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kerwin <[email protected]>
Co-authored-by: caishunfeng <[email protected]>
---
.../api/aspect/CacheEvictAspect.java | 138 ++++++++++++++
.../api/service/impl/QueueServiceImpl.java | 6 +-
.../api/service/impl/SchedulerServiceImpl.java | 1 +
.../api/service/impl/TenantServiceImpl.java | 34 ++--
.../api/service/impl/UsersServiceImpl.java | 12 +-
.../api/service/QueueServiceTest.java | 54 +++---
.../api/service/TenantServiceTest.java | 23 ++-
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../dolphinscheduler/common/enums/CacheType.java | 49 +++--
.../dao/datasource/SpringConnectionFactory.java | 2 +
.../dao/mapper/ProcessDefinitionLogMapper.java | 23 ++-
.../dao/mapper/ProcessDefinitionMapper.java | 25 ++-
.../dao/mapper/ProcessTaskRelationMapper.java | 61 ++++---
.../dao/mapper/ScheduleMapper.java | 33 +++-
.../dao/mapper/TaskDefinitionLogMapper.java | 44 +++--
.../dolphinscheduler/dao/mapper/TenantMapper.java | 29 ++-
.../dolphinscheduler/dao/mapper/UserMapper.java | 47 ++++-
.../dao/mapper/WorkerGroupMapper.java | 19 ++
dolphinscheduler-dist/release-docs/LICENSE | 3 +
.../release-docs/licenses/LICENSE-caffeine.txt | 202 +++++++++++++++++++++
.../release-docs/licenses/LICENSE-checker-qual.txt | 22 +++
.../licenses/LICENSE-spring-boot-starter-cache.txt | 202 +++++++++++++++++++++
.../remote/command/CacheExpireCommand.java | 67 +++++++
.../remote/command/CommandType.java | 7 +-
.../command/cache/CacheExpireCommandTest.java | 38 ++--
dolphinscheduler-server/pom.xml | 14 ++
.../server/master/MasterServer.java | 4 +
.../server/master/config/MasterConfig.java | 11 --
.../server/master/processor/CacheProcessor.java | 73 ++++++++
.../server/master/runner/EventExecuteService.java | 7 +-
.../master/runner/MasterSchedulerService.java | 23 +--
.../master/runner/WorkflowExecuteThread.java | 18 +-
.../src/main/resources/application-master.yaml | 28 +--
.../master/processor/CacheProcessorTest.java | 74 ++++++++
dolphinscheduler-service/pom.xml | 5 +
.../service/alert/ProcessAlertManager.java | 48 +++--
.../service/cache/CacheNotifyService.java | 30 +--
.../service/cache/impl/CacheKeyGenerator.java | 34 ++--
.../service/cache/impl/CacheNotifyServiceImpl.java | 135 ++++++++++++++
.../service/process/ProcessService.java | 41 +++--
.../service/cache/CacheNotifyServiceTest.java | 86 +++++++++
.../service/process/ProcessServiceTest.java | 20 +-
.../src/main/resources/application-standalone.yaml | 13 ++
tools/dependencies/known-dependencies.txt | 3 +
44 files changed, 1478 insertions(+), 331 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
new file mode 100644
index 0000000..e7f1ba1
--- /dev/null
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.aspect;
+
+import org.apache.dolphinscheduler.common.enums.CacheType;
+import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
+import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
+import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
+import org.springframework.stereotype.Component;
+
+/**
+ * aspect for cache evict
+ */
+@Aspect
+@Component
+public class CacheEvictAspect {
+
+ private static final Logger logger =
LoggerFactory.getLogger(CacheEvictAspect.class);
+
+ /**
+ * symbol of spring el
+ */
+ private static final String EL_SYMBOL = "#";
+
+ /**
+ * prefix of spring el
+ */
+ private static final String P = "p";
+
+ @Autowired
+ private CacheKeyGenerator cacheKeyGenerator;
+
+ @Autowired
+ private CacheNotifyService cacheNotifyService;
+
+ @Pointcut("@annotation(org.springframework.cache.annotation.CacheEvict)")
+ public void cacheEvictPointCut() {
+ // Do nothing because of it's a pointcut
+ }
+
+ @Around("cacheEvictPointCut()")
+ public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws
Throwable {
+ MethodSignature sign = (MethodSignature)
proceedingJoinPoint.getSignature();
+ Method method = sign.getMethod();
+ Object target = proceedingJoinPoint.getTarget();
+ Object[] args = proceedingJoinPoint.getArgs();
+
+ Object result = proceedingJoinPoint.proceed();
+
+ CacheConfig cacheConfig =
method.getDeclaringClass().getAnnotation(CacheConfig.class);
+ CacheEvict cacheEvict = method.getAnnotation(CacheEvict.class);
+
+ CacheType cacheType = getCacheType(cacheConfig, cacheEvict);
+ if (cacheType != null) {
+ String cacheKey;
+ if (cacheEvict.key().isEmpty()) {
+ cacheKey = (String) cacheKeyGenerator.generate(target, method,
args);
+ } else {
+ cacheKey = cacheEvict.key();
+ if (cacheEvict.key().contains(EL_SYMBOL)) {
+ cacheKey = parseKey(cacheEvict.key(), Arrays.asList(args));
+ }
+ }
+ if (StringUtils.isNotEmpty(cacheKey)) {
+ cacheNotifyService.notifyMaster(new
CacheExpireCommand(cacheType, cacheKey).convert2Command());
+ }
+ }
+
+ return result;
+ }
+
+ private CacheType getCacheType(CacheConfig cacheConfig, CacheEvict
cacheEvict) {
+ String cacheName = null;
+ if (cacheEvict.cacheNames().length > 0) {
+ cacheName = cacheEvict.cacheNames()[0];
+ }
+ if (cacheConfig.cacheNames().length > 0) {
+ cacheName = cacheConfig.cacheNames()[0];
+ }
+ if (cacheName == null) {
+ return null;
+ }
+ for (CacheType cacheType : CacheType.values()) {
+ if (cacheType.getCacheName().equals(cacheName)) {
+ return cacheType;
+ }
+ }
+ return null;
+ }
+
+ private String parseKey(String key, List<Object> paramList) {
+ SpelExpressionParser spelParser = new SpelExpressionParser();
+ EvaluationContext ctx = new StandardEvaluationContext();
+ for (int i = 0; i < paramList.size(); i++) {
+ ctx.setVariable(P + i, paramList.get(i));
+ }
+ Object obj = spelParser.parseExpression(key).getValue(ctx);
+ if (null == obj) {
+ throw new RuntimeException("parseKey error");
+ }
+ return obj.toString();
+ }
+}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index 8169541..2da89df 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -89,7 +89,7 @@ public class QueueServiceImpl extends BaseServiceImpl
implements QueueService {
public Result queryList(User loginUser, String searchVal, Integer pageNo,
Integer pageSize) {
Result result = new Result();
if (!isAdmin(loginUser)) {
- putMsg(result,Status.USER_NO_OPERATION_PERM);
+ putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -231,7 +231,7 @@ public class QueueServiceImpl extends BaseServiceImpl
implements QueueService {
/**
* verify queue and queueName
*
- * @param queue queue
+ * @param queue queue
* @param queueName queue name
* @return true if the queue name not exists, otherwise return false
*/
@@ -320,7 +320,7 @@ public class QueueServiceImpl extends BaseServiceImpl
implements QueueService {
* @param newQueue new queue name
* @return true if need to update user
*/
- private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) {
+ private boolean checkIfQueueIsInUsing(String oldQueue, String newQueue) {
return !oldQueue.equals(newQueue) && userMapper.existUser(oldQueue) ==
Boolean.TRUE;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index c9af32d..a56c6fe 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -363,6 +363,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
return result;
}
} catch (Exception e) {
+ logger.error("set online failure", e);
result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ?
"set online failure" : "set offline failure");
throw new ServiceException(result.get(Constants.MSG).toString());
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 7c90bcf..0601725 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -17,10 +17,6 @@
package org.apache.dolphinscheduler.api.service.impl;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TenantService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -37,15 +33,22 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
/**
* tenant service impl
*/
@@ -67,10 +70,10 @@ public class TenantServiceImpl extends BaseServiceImpl
implements TenantService
/**
* create tenant
*
- * @param loginUser login user
+ * @param loginUser login user
* @param tenantCode tenant code
- * @param queueId queue id
- * @param desc description
+ * @param queueId queue id
+ * @param desc description
* @return create result code
* @throws Exception exception
*/
@@ -133,7 +136,7 @@ public class TenantServiceImpl extends BaseServiceImpl
implements TenantService
Result result = new Result();
if (!isAdmin(loginUser)) {
- putMsg(result,Status.USER_NO_OPERATION_PERM);
+ putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -152,11 +155,11 @@ public class TenantServiceImpl extends BaseServiceImpl
implements TenantService
/**
* updateProcessInstance tenant
*
- * @param loginUser login user
- * @param id tenant id
+ * @param loginUser login user
+ * @param id tenant id
* @param tenantCode tenant code
- * @param queueId queue id
- * @param desc description
+ * @param queueId queue id
+ * @param desc description
* @return update result code
* @throws Exception exception
*/
@@ -269,6 +272,7 @@ public class TenantServiceImpl extends BaseServiceImpl
implements TenantService
tenantMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByTenantId(id, -1);
+
putMsg(result, Status.SUCCESS);
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index ab7a244..3b5e0d6 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
@@ -53,6 +52,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@@ -118,7 +118,6 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
@Autowired
private ProjectMapper projectMapper;
-
/**
* create user, only system admin have permission
*
@@ -323,7 +322,7 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
public Result queryUserList(User loginUser, String searchVal, Integer
pageNo, Integer pageSize) {
Result result = new Result();
if (!isAdmin(loginUser)) {
- putMsg(result,Status.USER_NO_OPERATION_PERM);
+ putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -343,8 +342,6 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
/**
* updateProcessInstance user
*
- *
- * @param loginUser
* @param userId user id
* @param userName user name
* @param userPassword user password
@@ -475,6 +472,7 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
// updateProcessInstance user
userMapper.updateById(user);
+
putMsg(result, Status.SUCCESS);
return result;
}
@@ -522,8 +520,9 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
}
accessTokenMapper.deleteAccessTokenByUserId(id);
-
+
userMapper.deleteById(id);
+
putMsg(result, Status.SUCCESS);
return result;
@@ -1151,6 +1150,7 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
Date now = new Date();
user.setUpdateTime(now);
userMapper.updateById(user);
+
User responseUser = userMapper.queryByUserNameAccurately(userName);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, responseUser);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
index 02e3ea1..f3167a5 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
@@ -73,7 +73,7 @@ public class QueueServiceTest {
}
@After
- public void after(){
+ public void after() {
}
@Test
@@ -82,7 +82,7 @@ public class QueueServiceTest {
Mockito.when(queueMapper.selectList(null)).thenReturn(getQueueList());
Map<String, Object> result = queueService.queryList(getLoginUser());
logger.info(result.toString());
- List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
+ List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
Assert.assertTrue(CollectionUtils.isNotEmpty(queueList));
}
@@ -90,13 +90,13 @@ public class QueueServiceTest {
@Test
public void testQueryListPage() {
- IPage<Queue> page = new Page<>(1,10);
+ IPage<Queue> page = new Page<>(1, 10);
page.setTotal(1L);
page.setRecords(getQueueList());
Mockito.when(queueMapper.queryQueuePaging(Mockito.any(Page.class),
Mockito.eq(queueName))).thenReturn(page);
- Result result = queueService.queryList(getLoginUser(),queueName,1,10);
+ Result result = queueService.queryList(getLoginUser(), queueName, 1,
10);
logger.info(result.toString());
- PageInfo<Queue> pageInfo = (PageInfo<Queue>) result.getData();
+ PageInfo<Queue> pageInfo = (PageInfo<Queue>) result.getData();
Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getTotalList()));
}
@@ -104,17 +104,17 @@ public class QueueServiceTest {
public void testCreateQueue() {
// queue is null
- Map<String, Object> result =
queueService.createQueue(getLoginUser(),null,queueName);
+ Map<String, Object> result = queueService.createQueue(getLoginUser(),
null, queueName);
logger.info(result.toString());
-
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS));
+ Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
result.get(Constants.STATUS));
// queueName is null
- result = queueService.createQueue(getLoginUser(),queueName,null);
+ result = queueService.createQueue(getLoginUser(), queueName, null);
logger.info(result.toString());
-
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS));
+ Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
result.get(Constants.STATUS));
// correct
- result = queueService.createQueue(getLoginUser(),queueName,queueName);
+ result = queueService.createQueue(getLoginUser(), queueName,
queueName);
logger.info(result.toString());
- Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
+ Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@@ -126,25 +126,25 @@ public class QueueServiceTest {
Mockito.when(queueMapper.existQueue(null, "test")).thenReturn(true);
// not exist
- Map<String, Object> result =
queueService.updateQueue(getLoginUser(),0,"queue",queueName);
+ Map<String, Object> result = queueService.updateQueue(getLoginUser(),
0, "queue", queueName);
logger.info(result.toString());
-
Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(), ((Status)
result.get(Constants.STATUS)).getCode());
//no need update
- result =
queueService.updateQueue(getLoginUser(),1,queueName,queueName);
+ result = queueService.updateQueue(getLoginUser(), 1, queueName,
queueName);
logger.info(result.toString());
-
Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(), ((Status)
result.get(Constants.STATUS)).getCode());
//queue exist
- result = queueService.updateQueue(getLoginUser(),1,"test",queueName);
+ result = queueService.updateQueue(getLoginUser(), 1, "test",
queueName);
logger.info(result.toString());
-
Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(), ((Status)
result.get(Constants.STATUS)).getCode());
// queueName exist
- result = queueService.updateQueue(getLoginUser(),1,"test1","test");
+ result = queueService.updateQueue(getLoginUser(), 1, "test1", "test");
logger.info(result.toString());
-
Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(), ((Status)
result.get(Constants.STATUS)).getCode());
//success
- result = queueService.updateQueue(getLoginUser(),1,"test1","test1");
+ result = queueService.updateQueue(getLoginUser(), 1, "test1", "test1");
logger.info(result.toString());
-
Assert.assertEquals(Status.SUCCESS.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.SUCCESS.getCode(), ((Status)
result.get(Constants.STATUS)).getCode());
}
@@ -155,27 +155,27 @@ public class QueueServiceTest {
Mockito.when(queueMapper.existQueue(null, queueName)).thenReturn(true);
//queue null
- Result result = queueService.verifyQueue(null,queueName);
+ Result result = queueService.verifyQueue(null, queueName);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(),
Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
//queueName null
- result = queueService.verifyQueue(queueName,null);
+ result = queueService.verifyQueue(queueName, null);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(),
Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
//exist queueName
- result = queueService.verifyQueue(queueName,queueName);
+ result = queueService.verifyQueue(queueName, queueName);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(),
Status.QUEUE_NAME_EXIST.getCode());
//exist queue
- result = queueService.verifyQueue(queueName,"test");
+ result = queueService.verifyQueue(queueName, "test");
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(),
Status.QUEUE_VALUE_EXIST.getCode());
// success
- result = queueService.verifyQueue("test","test");
+ result = queueService.verifyQueue("test", "test");
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(),
Status.SUCCESS.getCode());
@@ -183,7 +183,6 @@ public class QueueServiceTest {
/**
* create admin user
- * @return
*/
private User getLoginUser() {
@@ -201,7 +200,6 @@ public class QueueServiceTest {
/**
* get queue
- * @return
*/
private Queue getQueue() {
Queue queue = new Queue();
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
index f51c7a8..e1c00d2 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
@@ -17,9 +17,6 @@
package org.apache.dolphinscheduler.api.service;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.TenantServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@@ -34,6 +31,13 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -44,9 +48,8 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* tenant service test
@@ -81,7 +84,7 @@ public class TenantServiceTest {
try {
//check tenantCode
Map<String, Object> result =
- tenantService.createTenant(getLoginUser(), "%!1111", 1,
"TenantServiceTest");
+ tenantService.createTenant(getLoginUser(), "%!1111", 1,
"TenantServiceTest");
logger.info(result.toString());
Assert.assertEquals(Status.CHECK_OS_TENANT_CODE_ERROR,
result.get(Constants.STATUS));
@@ -109,7 +112,7 @@ public class TenantServiceTest {
page.setRecords(getList());
page.setTotal(1L);
Mockito.when(tenantMapper.queryTenantPaging(Mockito.any(Page.class),
Mockito.eq("TenantServiceTest")))
- .thenReturn(page);
+ .thenReturn(page);
Result result = tenantService.queryTenantList(getLoginUser(),
"TenantServiceTest", 1, 10);
logger.info(result.toString());
PageInfo<Tenant> pageInfo = (PageInfo<Tenant>) result.getData();
@@ -124,7 +127,7 @@ public class TenantServiceTest {
try {
// id not exist
Map<String, Object> result =
- tenantService.updateTenant(getLoginUser(), 912222, tenantCode,
1, "desc");
+ tenantService.updateTenant(getLoginUser(), 912222,
tenantCode, 1, "desc");
logger.info(result.toString());
// success
Assert.assertEquals(Status.TENANT_NOT_EXIST,
result.get(Constants.STATUS));
@@ -143,7 +146,7 @@ public class TenantServiceTest {
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(processInstanceMapper.queryByTenantIdAndStatus(1,
Constants.NOT_TERMINATED_STATES))
- .thenReturn(getInstanceList());
+ .thenReturn(getInstanceList());
Mockito.when(processDefinitionMapper.queryDefinitionListByTenant(2)).thenReturn(getDefinitionsList());
Mockito.when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList());
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index efeb478..72b5325 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -1095,4 +1095,5 @@ public final class Constants {
public static final int DRY_RUN_FLAG_NO = 0;
public static final int DRY_RUN_FLAG_YES = 1;
+ public static final String CACHE_KEY_VALUE_ALL = "'all'";
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
similarity index 55%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
copy to
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
index 21af4b3..f845b20 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
@@ -15,32 +15,25 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.mapper;
-
-import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
-
-import org.apache.ibatis.annotations.Param;
-
-import java.util.List;
-
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-
-/**
- * worker group mapper interface
- */
-public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
-
- /**
- * query all worker group
- * @return worker group list
- */
- List<WorkerGroup> queryAllWorkerGroup();
-
- /**
- * query worer grouop by name
- * @param name name
- * @return worker group list
- */
- List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
-
+package org.apache.dolphinscheduler.common.enums;
+
+public enum CacheType {
+ TENANT("tenant"),
+ USER("user"),
+ QUEUE("queue"),
+ PROCESS_DEFINITION("processDefinition"),
+ PROCESS_TASK_RELATION("processTaskRelation"),
+ TASK_DEFINITION("taskDefinition"),
+ WORKER_GROUP("workerGroup"),
+ SCHEDULE("schedule");
+
+ CacheType(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ private final String cacheName;
+
+ public String getCacheName() {
+ return cacheName;
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
index e44ac85..ca7fc80 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
@@ -39,6 +39,7 @@ import java.util.Properties;
@Configuration
public class SpringConnectionFactory {
+
@Bean
public PaginationInterceptor paginationInterceptor() {
return new PaginationInterceptor();
@@ -57,6 +58,7 @@ public class SpringConnectionFactory {
configuration.setCallSettersOnNulls(true);
configuration.setJdbcTypeForNull(JdbcType.NULL);
configuration.addInterceptor(paginationInterceptor());
+
configuration.setGlobalConfig(new GlobalConfig().setBanner(false));
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new
MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
index 51ccb73..bd9297e 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -23,6 +23,9 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -30,9 +33,20 @@ import
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* process definition log mapper interface
*/
+@CacheConfig(cacheNames = "processDefinition", keyGenerator =
"cacheKeyGenerator")
public interface ProcessDefinitionLogMapper extends
BaseMapper<ProcessDefinitionLog> {
/**
+ * query the certain process definition version info by process definition
code and version number
+ *
+ * @param code process definition code
+ * @param version version number
+ * @return the process definition version info
+ */
+ @Cacheable(sync = true)
+ ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long
code, @Param("version") int version);
+
+ /**
* query process definition log by name
*
* @param projectCode projectCode
@@ -60,15 +74,6 @@ public interface ProcessDefinitionLogMapper extends
BaseMapper<ProcessDefinition
ProcessDefinitionLog queryMaxVersionDefinitionLog(@Param("code") long
code);
/**
- * query the certain process definition version info by process definition
code and version number
- *
- * @param code process definition code
- * @param version version number
- * @return the process definition version info
- */
- ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long
code, @Param("version") int version);
-
- /**
* query the paging process definition version list by pagination info
*
* @param page pagination info
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index 3a731ea..912b8f3 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Param;
@@ -28,13 +27,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* process definition mapper interface
*/
+@CacheConfig(cacheNames = "processDefinition", keyGenerator =
"cacheKeyGenerator")
public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition>
{
/**
@@ -43,15 +46,14 @@ public interface ProcessDefinitionMapper extends
BaseMapper<ProcessDefinition> {
* @param code code
* @return process definition
*/
+ @Cacheable(sync = true)
ProcessDefinition queryByCode(@Param("code") long code);
/**
- * query process definition by code list
- *
- * @param codes codes
- * @return process definition list
+ * update
*/
- List<ProcessDefinition> queryByCodes(@Param("codes") Collection<Long>
codes);
+ @CacheEvict(key = "#p0.code")
+ int updateById(@Param("et") ProcessDefinition processDefinition);
/**
* delete process definition by code
@@ -59,9 +61,18 @@ public interface ProcessDefinitionMapper extends
BaseMapper<ProcessDefinition> {
* @param code code
* @return delete result
*/
+ @CacheEvict
int deleteByCode(@Param("code") long code);
/**
+ * query process definition by code list
+ *
+ * @param codes codes
+ * @return process definition list
+ */
+ List<ProcessDefinition> queryByCodes(@Param("codes") Collection<Long>
codes);
+
+ /**
* verify process definition by name
*
* @param projectCode projectCode
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 5cb5a77..ca169b8 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -25,11 +25,16 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* process task relation mapper interface
*/
+@CacheConfig(cacheNames = "processTaskRelation", keyGenerator =
"cacheKeyGenerator")
public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelation> {
/**
@@ -39,10 +44,27 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
* @param processCode processCode
* @return ProcessTaskRelation list
*/
+ @Cacheable(unless = "#result == null || #result.size() == 0")
List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long
projectCode,
@Param("processCode") long
processCode);
/**
+ * update
+ */
+ @CacheEvict(key = "#p0.projectCode + '_' + #p0.processDefinitionCode")
+ int updateById(@Param("et") ProcessTaskRelation processTaskRelation);
+
+ /**
+ * delete process task relation by processCode
+ *
+ * @param projectCode projectCode
+ * @param processCode processCode
+ * @return int
+ */
+ @CacheEvict
+ int deleteByCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode);
+
+ /**
* process task relation by taskCode
*
* @param taskCodes taskCode list
@@ -59,16 +81,6 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
List<ProcessTaskRelation> queryByTaskCode(@Param("taskCode") long
taskCode);
/**
- * delete process task relation by processCode
- *
- * @param projectCode projectCode
- * @param processCode processCode
- * @return int
- */
- int deleteByCode(@Param("projectCode") long projectCode,
- @Param("processCode") long processCode);
-
- /**
* batch insert process task relation
*
* @param taskRelationList taskRelationList
@@ -88,7 +100,7 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
* query upstream process task relation by taskCode
*
* @param projectCode projectCode
- * @param taskCode taskCode
+ * @param taskCode taskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryUpstreamByCode(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode);
@@ -97,7 +109,7 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
* query downstream process task relation by taskCode
*
* @param projectCode projectCode
- * @param taskCode taskCode
+ * @param taskCode taskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode);
@@ -105,31 +117,32 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* query task relation by codes
*
- * @param projectCode projectCode
- * @param taskCode taskCode
+ * @param projectCode projectCode
+ * @param taskCode taskCode
* @param preTaskCodes preTaskCode list
* @return ProcessTaskRelation
*/
- List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[]
preTaskCodes);
+ List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode, @Param("preTaskCodes") Long[]
preTaskCodes);
/**
* count upstream by codes
*
* @param projectCode projectCode
- * @param taskCode taskCode
- * @param processDefinitionCodes processDefinitionCodes
+ * @param taskCode taskCode
+ * @param processDefinitionCodes processDefinitionCodes
* @return upstream count list group by process definition code
*/
List<Map<String, Long>>
countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long
projectCode,
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
@Param("taskCode") long taskCode);
+
/**
* query by code
*
- * @param projectCode projectCode
+ * @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
- * @param preTaskCode preTaskCode
- * @param postTaskCode postTaskCode
+ * @param preTaskCode preTaskCode
+ * @param postTaskCode postTaskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryByCode(@Param("projectCode") long
projectCode,
@@ -140,7 +153,7 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* delete process task relation
*
- * @param processTaskRelationLog processTaskRelationLog
+ * @param processTaskRelationLog processTaskRelationLog
* @return int
*/
int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog
processTaskRelationLog);
@@ -148,10 +161,10 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* count by code
*
- * @param projectCode projectCode
+ * @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
- * @param preTaskCode preTaskCode
- * @param postTaskCode postTaskCode
+ * @param preTaskCode preTaskCode
+ * @param postTaskCode postTaskCode
* @return ProcessTaskRelation
*/
int countByCode(@Param("projectCode") long projectCode,
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index be7369c..d0b2d32 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.Schedule;
@@ -22,16 +23,37 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* scheduler mapper interface
*/
+@CacheConfig(cacheNames = "schedule", keyGenerator = "cacheKeyGenerator")
public interface ScheduleMapper extends BaseMapper<Schedule> {
+ @CacheEvict(key = "#p0.processDefinitionCode")
+ int insert(Schedule entity);
+
+ @CacheEvict(key = "#p0.processDefinitionCode")
+ int updateById(@Param("et") Schedule entity);
+
+ /**
+ * query schedule list by process definition code
+ *
+ * @param processDefinitionCode processDefinitionCode
+ * @return schedule list
+ */
+ @Cacheable(sync = true)
+ List<Schedule>
queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode")
long processDefinitionCode);
+
/**
* scheduler page
+ *
* @param page page
* @param processDefinitionCode processDefinitionCode
* @param searchVal searchVal
@@ -43,6 +65,7 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
/**
* query schedule list by project name
+ *
* @param projectName projectName
* @return schedule list
*/
@@ -50,6 +73,7 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
/**
* query schedule list by process definition codes
+ *
* @param processDefineCodes processDefineCodes
* @return schedule list
*/
@@ -57,16 +81,9 @@ public interface ScheduleMapper extends BaseMapper<Schedule>
{
/**
* query schedule list by process definition code
+ *
* @param processDefinitionCode processDefinitionCode
* @return schedule
*/
Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long
processDefinitionCode);
-
- /**
- * query schedule list by process definition code
- * @param processDefinitionCode processDefinitionCode
- * @return schedule list
- */
- List<Schedule>
queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode")
long processDefinitionCode);
-
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index ab2620f..851d8da 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -25,6 +25,10 @@ import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -32,24 +36,41 @@ import
com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task definition log mapper interface
*/
+@CacheConfig(cacheNames = "taskDefinition", keyGenerator = "cacheKeyGenerator")
public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog>
{
/**
- * query max version for definition
+ * query task definition log
*
* @param code taskDefinitionCode
+ * @param version version
+ * @return task definition log
*/
- Integer queryMaxVersionForDefinition(@Param("code") long code);
+ @Cacheable(sync = true)
+ TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long
code, @Param("version") int version);
/**
- * query task definition log
+ * update
+ */
+ @CacheEvict(key = "#p0.code + '_' + #p0.version")
+ int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog);
+
+ /**
+ * delete the certain task definition version by task definition code and
version
+ *
+ * @param code task definition code
+ * @param version task definition version
+ * @return delete result
+ */
+ @CacheEvict
+ int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int
version);
+
+ /**
+ * query max version for definition
*
* @param code taskDefinitionCode
- * @param version version
- * @return task definition log
*/
- TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,
- @Param("version") int
version);
+ Integer queryMaxVersionForDefinition(@Param("code") long code);
/**
* @param taskDefinitions taskDefinition list
@@ -66,15 +87,6 @@ public interface TaskDefinitionLogMapper extends
BaseMapper<TaskDefinitionLog> {
int batchInsert(@Param("taskDefinitionLogs") List<TaskDefinitionLog>
taskDefinitionLogs);
/**
- * delete the certain task definition version by task definition code and
version
- *
- * @param code task definition code
- * @param version task definition version
- * @return delete result
- */
- int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int
version);
-
- /**
* query the paging task definition version list by pagination info
*
* @param page pagination info
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
index 89b6237..88f5600 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
@@ -14,29 +14,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.Tenant;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
+
import org.apache.ibatis.annotations.Param;
-import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* tenant mapper interface
*/
+@CacheConfig(cacheNames = "tenant", keyGenerator = "cacheKeyGenerator")
public interface TenantMapper extends BaseMapper<Tenant> {
/**
* query tenant by id
+ *
* @param tenantId tenantId
* @return tenant
*/
+ @Cacheable(sync = true)
Tenant queryById(@Param("tenantId") int tenantId);
/**
+ * delete by id
+ */
+ @CacheEvict
+ int deleteById(int id);
+
+ /**
+ * update
+ */
+ @CacheEvict(key = "#p0.id")
+ int updateById(@Param("et") Tenant tenant);
+
+ /**
* query tenant by code
+ *
* @param tenantCode tenantCode
* @return tenant
*/
@@ -44,6 +65,7 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* tenant page
+ *
* @param page page
* @param searchVal searchVal
* @return tenant IPage
@@ -53,6 +75,7 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* check tenant exist
+ *
* @param tenantCode tenantCode
* @return true if exist else return null
*/
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
index 9fcb488..bd61141 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
@@ -14,29 +14,57 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.User;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
/**
* user mapper interface
*/
+@CacheConfig(cacheNames = "user", keyGenerator = "cacheKeyGenerator")
public interface UserMapper extends BaseMapper<User> {
/**
+ * select by user id
+ */
+ @Cacheable(sync = true)
+ User selectById(int id);
+
+ /**
+ * delete by id
+ */
+ @CacheEvict
+ int deleteById(int id);
+
+ /**
+ * update
+ */
+ @CacheEvict(key = "#p0.id")
+ int updateById(@Param("et") User user);
+
+ /**
* query all general user
+ *
* @return user list
*/
List<User> queryAllGeneralUser();
/**
* query user by name
+ *
* @param userName userName
* @return user
*/
@@ -44,6 +72,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by userName and password
+ *
* @param userName userName
* @param password password
* @return user
@@ -53,6 +82,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* user page
+ *
* @param page page
* @param userName userName
* @return user IPage
@@ -62,6 +92,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user detail by id
+ *
* @param userId userId
* @return user
*/
@@ -69,6 +100,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user list by alertgroupId
+ *
* @param alertgroupId alertgroupId
* @return user list
*/
@@ -76,6 +108,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user list by tenantId
+ *
* @param tenantId tenantId
* @return user list
*/
@@ -83,6 +116,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by userId
+ *
* @param userId userId
* @return user
*/
@@ -90,6 +124,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by token
+ *
* @param token token
* @return user
*/
@@ -97,6 +132,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by queue name
+ *
* @param queueName queue name
* @return user list
*/
@@ -104,13 +140,15 @@ public interface UserMapper extends BaseMapper<User> {
/**
* check the user exist
- * @param queueName queue name
+ *
+ * @param queue queue name
* @return true if exist else return null
*/
Boolean existUser(@Param("queue") String queue);
/**
* update user with old queue
+ *
* @param oldQueue old queue name
* @param newQueue new queue name
* @return update rows
@@ -127,6 +165,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query authed user list by projectId
+ *
* @param projectId projectId
* @return user list
*/
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
index 21af4b3..fcff987 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
@@ -17,27 +17,46 @@
package org.apache.dolphinscheduler.dao.mapper;
+import static org.apache.dolphinscheduler.common.Constants.CACHE_KEY_VALUE_ALL;
+
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* worker group mapper interface
*/
+@CacheConfig(cacheNames = "workerGroup", keyGenerator = "cacheKeyGenerator")
public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
/**
* query all worker group
+ *
* @return worker group list
*/
+ @Cacheable(sync = true, key = CACHE_KEY_VALUE_ALL)
List<WorkerGroup> queryAllWorkerGroup();
+ @CacheEvict(key = CACHE_KEY_VALUE_ALL)
+ int deleteById(Integer id);
+
+ @CacheEvict(key = CACHE_KEY_VALUE_ALL)
+ int insert(WorkerGroup entity);
+
+ @CacheEvict(key = CACHE_KEY_VALUE_ALL)
+ int updateById(@Param("et") WorkerGroup entity);
+
/**
* query worer grouop by name
+ *
* @param name name
* @return worker group list
*/
diff --git a/dolphinscheduler-dist/release-docs/LICENSE
b/dolphinscheduler-dist/release-docs/LICENSE
index a9abb80..5be5dde 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -225,6 +225,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
aws-sdk-java 1.7.4:
https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk/1.7.4, Apache 2.0
bonecp 0.8.0.RELEASE: https://github.com/wwadge/bonecp, Apache 2.0
byte-buddy 1.9.16:
https://mvnrepository.com/artifact/net.bytebuddy/byte-buddy/1.9.16, Apache 2.0
+ caffeine 2.9.2:
https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine/2.9.2,
Apache 2.0
classmate 1.5.1:
https://mvnrepository.com/artifact/com.fasterxml/classmate/1.5.1, Apache 2.0
clickhouse-jdbc 0.1.52:
https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc/0.1.52,
Apache 2.0
commons-beanutils 1.9.4
https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils/1.9.4,
Apache 2.0
@@ -369,6 +370,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
spring-boot-starter-logging 2.5.6:
https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-logging/2.5.6,
Apache 2.0
spring-boot-starter-quartz 2.5.6:
https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-quartz/2.5.6,
Apache 2.0
spring-boot-starter-web 2.5.6:
https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web/2.5.6,
Apache 2.0
+ spring-boot-starter-cache 2.5.6:
https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-cache/2.5.6,
Apache 2.0
spring-context 5.3.12:
https://mvnrepository.com/artifact/org.springframework/spring-context/5.3.12,
Apache 2.0
spring-context-support 5.3.12:
https://mvnrepository.com/artifact/org.springframework/spring-context-support/5.3.12,
Apache 2.0
spring-core 5.3.12:
https://mvnrepository.com/artifact/org.springframework/spring-core, Apache 2.0
@@ -472,6 +474,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
slf4j-api 1.7.5:
https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.7.5, MIT
animal-sniffer-annotations 1.14
https://mvnrepository.com/artifact/org.codehaus.mojo/animal-sniffer-annotations/1.14,
MIT
checker-compat-qual 2.0.0
https://mvnrepository.com/artifact/org.checkerframework/checker-compat-qual/2.0.0,
MIT + GPLv2
+ checker-qual 3.10.0
https://mvnrepository.com/artifact/org.checkerframework/checker-qual/3.10.0,
MIT + GPLv2
Java-WebSocket 1.5.1: https://github.com/TooTallNate/Java-WebSocket MIT
========================================================================
diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-caffeine.txt
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-caffeine.txt
new file mode 100644
index 0000000..0b42f8f
--- /dev/null
+++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-caffeine.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
\ No newline at end of file
diff --git
a/dolphinscheduler-dist/release-docs/licenses/LICENSE-checker-qual.txt
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-checker-qual.txt
new file mode 100644
index 0000000..7b59b5c
--- /dev/null
+++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-checker-qual.txt
@@ -0,0 +1,22 @@
+Checker Framework qualifiers
+Copyright 2004-present by the Checker Framework developers
+
+MIT License:
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
\ No newline at end of file
diff --git
a/dolphinscheduler-dist/release-docs/licenses/LICENSE-spring-boot-starter-cache.txt
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-spring-boot-starter-cache.txt
new file mode 100644
index 0000000..82714d7
--- /dev/null
+++
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-spring-boot-starter-cache.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
\ No newline at end of file
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
new file mode 100644
index 0000000..a32d4fc
--- /dev/null
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.enums.CacheType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+/**
+ * db task ack request command
+ */
+public class CacheExpireCommand implements Serializable {
+
+ private CacheType cacheType;
+ private String cacheKey;
+
+ public CacheExpireCommand() {
+ super();
+ }
+
+ public CacheExpireCommand(CacheType cacheType, String cacheKey) {
+ this.cacheType = cacheType;
+ this.cacheKey = cacheKey;
+ }
+
+ public CacheType getCacheType() {
+ return cacheType;
+ }
+
+ public String getCacheKey() {
+ return cacheKey;
+ }
+
+ /**
+ * package command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.CACHE_EXPIRE);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CacheExpireCommand{CacheType=%s, cacheKey=%s}",
cacheType, cacheKey);
+ }
+}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 786d10c..cf2cfe5 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -136,5 +136,10 @@ public enum CommandType {
/**
* state event request
*/
- STATE_EVENT_REQUEST;
+ STATE_EVENT_REQUEST,
+
+ /**
+ * cache expire
+ */
+ CACHE_EXPIRE;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
similarity index 54%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
copy to
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
index 21af4b3..2351234 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
@@ -15,32 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.mapper;
+package org.apache.dolphinscheduler.remote.command.cache;
-import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.common.enums.CacheType;
+import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.ibatis.annotations.Param;
+import org.junit.Assert;
+import org.junit.Test;
-import java.util.List;
-
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-
-/**
- * worker group mapper interface
- */
-public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
-
- /**
- * query all worker group
- * @return worker group list
- */
- List<WorkerGroup> queryAllWorkerGroup();
-
- /**
- * query worer grouop by name
- * @param name name
- * @return worker group list
- */
- List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
+public class CacheExpireCommandTest {
+ @Test
+ public void testConvert2Command() {
+ CacheExpireCommand cacheExpireCommand = new
CacheExpireCommand(CacheType.TENANT, "1");
+ Command command = cacheExpireCommand.convert2Command();
+ Assert.assertEquals(CommandType.CACHE_EXPIRE, command.getType());
+ }
}
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index bb5fef9..a629ab6 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -125,6 +125,20 @@
<artifactId>spring-boot-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-cache</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j-api</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-to-slf4j</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index a6642a1..179b702 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
@@ -49,6 +50,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.context.event.EventListener;
@@ -66,6 +68,7 @@ import
org.springframework.transaction.annotation.EnableTransactionManagement;
})
})
@EnableTransactionManagement
+@EnableCaching
public class MasterServer implements IStoppable {
/**
* logger of MasterServer
@@ -146,6 +149,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK,
ackProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE,
taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST,
stateEventProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE,
new CacheProcessor());
this.nettyRemotingServer.start();
// self tolerant
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index b7e5642..c075415 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -60,9 +60,6 @@ public class MasterConfig {
@Value("${master.reserved.memory:0.3}")
private double masterReservedMemory;
- @Value("${master.cache.process.definition:true}")
- private boolean masterCacheProcessDefinition;
-
@Value("${master.failover.interval:10}")
private int failoverInterval;
@@ -163,14 +160,6 @@ public class MasterConfig {
this.stateWheelInterval = stateWheelInterval;
}
- public boolean getMasterCacheProcessDefinition() {
- return masterCacheProcessDefinition;
- }
-
- public void setMasterCacheProcessDefinition(boolean
masterCacheProcessDefinition) {
- this.masterCacheProcessDefinition = masterCacheProcessDefinition;
- }
-
public int getFailoverInterval() {
return failoverInterval;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
new file mode 100644
index 0000000..6db7f65
--- /dev/null
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.enums.CacheType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+/**
+ * cache process from master/api
+ */
+public class CacheProcessor implements NettyRequestProcessor {
+
+ private final Logger logger =
LoggerFactory.getLogger(CacheProcessor.class);
+
+ private CacheManager cacheManager;
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.CACHE_EXPIRE ==
command.getType(), String.format("invalid command type: %s",
command.getType()));
+
+ CacheExpireCommand cacheExpireCommand =
JSONUtils.parseObject(command.getBody(), CacheExpireCommand.class);
+
+ logger.info("received command : {}", cacheExpireCommand);
+
+ this.cacheExpire(cacheExpireCommand);
+ }
+
+ private void cacheExpire(CacheExpireCommand cacheExpireCommand) {
+ if (cacheManager == null) {
+ cacheManager =
SpringApplicationContext.getBean(CacheManager.class);
+ }
+
+ if (cacheExpireCommand.getCacheKey().isEmpty()) {
+ return;
+ }
+
+ CacheType cacheType = cacheExpireCommand.getCacheType();
+ Cache cache = cacheManager.getCache(cacheType.getCacheName());
+ if (cache != null) {
+ cache.evict(cacheExpireCommand.getCacheKey());
+ logger.info("cache evict, type:{}, key:{}",
cacheType.getCacheName(), cacheExpireCommand.getCacheKey());
+ }
+ }
+}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 4184ba0..ae17bab 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -146,9 +147,11 @@ public class EventExecuteService extends Thread {
}
private void notifyProcessChanged() {
- Map<ProcessInstance, TaskInstance> fatherMaps
- =
processService.notifyProcessList(processInstanceId, 0);
+ if (Flag.NO ==
workflowExecuteThread.getProcessInstance().getIsSubProcess()) {
+ return;
+ }
+ Map<ProcessInstance, TaskInstance> fatherMaps =
processService.notifyProcessList(processInstanceId);
for (ProcessInstance processInstance :
fatherMaps.keySet()) {
String address =
NetUtils.getAddr(masterConfig.getListenPort());
if
(processInstance.getHost().equalsIgnoreCase(address)) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 79cce4f..3aa97a5 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -125,12 +125,6 @@ public class MasterSchedulerService extends Thread {
*/
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new
ConcurrentHashMap<>();
- /**
- * key:code-version
- * value: processDefinition
- */
- HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new
HashMap<>();
-
private StateWheelExecuteThread stateWheelExecuteThread;
/**
@@ -207,14 +201,8 @@ public class MasterSchedulerService extends Thread {
if (command != null) {
logger.info("find one command: id: {}, type: {}", command.getId(),
command.getCommandType());
try {
- ProcessInstance processInstance =
processService.handleCommand(logger,
- getLocalAddress(),
- command,
- processDefinitionCacheMaps);
- if (!masterConfig.getMasterCacheProcessDefinition()
- && processDefinitionCacheMaps.size() > 0) {
- processDefinitionCacheMaps.clear();
- }
+ ProcessInstance processInstance =
processService.handleCommand(logger, getLocalAddress(), command);
+
if (processInstance != null) {
WorkflowExecuteThread workflowExecuteThread = new
WorkflowExecuteThread(
processInstance
@@ -258,16 +246,13 @@ public class MasterSchedulerService extends Thread {
}
for (Command command : commandList) {
int slot = ServerNodeManager.getSlot();
- if (ServerNodeManager.MASTER_SIZE != 0
- && command.getId() % ServerNodeManager.MASTER_SIZE ==
slot) {
+ if (ServerNodeManager.MASTER_SIZE != 0 && command.getId() %
ServerNodeManager.MASTER_SIZE == slot) {
result = command;
break;
}
}
if (result != null) {
- logger.info("find command {}, slot:{} :",
- result.getId(),
- ServerNodeManager.getSlot());
+ logger.info("find command {}, slot:{} :", result.getId(),
ServerNodeManager.getSlot());
break;
}
pageNumber += 1;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 7907686..8f8f450 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -48,9 +48,11 @@ import
org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
@@ -530,9 +532,11 @@ public class WorkflowExecuteThread implements Runnable {
if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null,
processInstance);
}
- List<TaskInstance> taskInstances =
processService.findValidTaskListByProcessId(processInstance.getId());
- ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
- processAlertManager.sendAlertProcessInstance(processInstance,
taskInstances, projectUser);
+ if (processAlertManager.isNeedToSendWarning(processInstance)) {
+ List<TaskInstance> taskInstances =
processService.findValidTaskListByProcessId(processInstance.getId());
+ ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+ processAlertManager.sendAlertProcessInstance(processInstance,
taskInstances, projectUser);
+ }
}
/**
@@ -544,11 +548,11 @@ public class WorkflowExecuteThread implements Runnable {
if (this.dag != null) {
return;
}
- processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion());
+ processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
recoverNodeIdList =
getStartTaskInstanceList(processInstance.getCommandParam());
- List<TaskNode> taskNodeList =
-
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(),
processDefinition.getCode()), Lists.newArrayList());
+ List<ProcessTaskRelation> processTaskRelationList =
processService.findRelationByCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+ List<TaskDefinitionLog> taskDefinitionLogList =
processService.getTaskDefineLogListByRelation(processTaskRelationList);
+ List<TaskNode> taskNodeList =
processService.transformTask(processTaskRelationList, taskDefinitionLogList);
forbiddenTaskList.clear();
taskNodeList.forEach(taskNode -> {
diff --git
a/dolphinscheduler-standalone-server/src/main/resources/application-standalone.yaml
b/dolphinscheduler-server/src/main/resources/application-master.yaml
similarity index 72%
copy from
dolphinscheduler-standalone-server/src/main/resources/application-standalone.yaml
copy to dolphinscheduler-server/src/main/resources/application-master.yaml
index 136a8c7..737b542 100644
---
a/dolphinscheduler-standalone-server/src/main/resources/application-standalone.yaml
+++ b/dolphinscheduler-server/src/main/resources/application-master.yaml
@@ -14,29 +14,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
spring:
application:
- name: standalone-server
+ name: master-server
+ cache:
+ # default unable cache, you can enable by `type: caffeine`
+ type: none
+ cache-names:
+ - tenant
+ - user
+ - processDefinition
+ - processTaskRelation
+ - taskDefinition
+ - workerGroup
+ - schedule
+ caffeine:
+ spec: maximumSize=100,expireAfterWrite=300s,recordStats
server:
- port: 12345
+ port: 5679
management:
endpoints:
web:
exposure:
include: '*'
- server:
- port: 8080
metrics:
tags:
application: ${spring.application.name}
-
-logging:
- level:
- org:
- apache:
- zookeeper: WARN
- hbase: WARN
- hadoop: WARN
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
new file mode 100644
index 0000000..5c177ca
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.common.enums.CacheType;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
+
+import io.netty.channel.Channel;
+
+/**
+ * task ack processor test
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class})
+public class CacheProcessorTest {
+
+ private CacheProcessor cacheProcessor;
+
+ @Mock
+ private Channel channel;
+
+ @Mock
+ private CacheManager cacheManager;
+
+ @Mock
+ private Cache cache;
+
+ @Before
+ public void before() {
+ PowerMockito.mockStatic(SpringApplicationContext.class);
+
PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager);
+
Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache);
+ cacheProcessor = new CacheProcessor();
+ }
+
+ @Test
+ public void testProcess() {
+ Tenant tenant = new Tenant();
+ tenant.setId(1);
+ CacheExpireCommand cacheExpireCommand = new
CacheExpireCommand(CacheType.TENANT, "1");
+ Command command = cacheExpireCommand.convert2Command();
+
+ cacheProcessor.process(channel, command);
+ }
+}
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
index 259712d..5438e1c 100644
--- a/dolphinscheduler-service/pom.xml
+++ b/dolphinscheduler-service/pom.xml
@@ -89,6 +89,11 @@
<artifactId>micrometer-core</artifactId>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index 1ee7c5a..94813a0 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -210,43 +210,53 @@ public class ProcessAlertManager {
List<TaskInstance> taskInstances,
ProjectUser projectUser) {
- if (Flag.YES == processInstance.getIsSubProcess()) {
+ if (!isNeedToSendWarning(processInstance)) {
return;
}
- boolean sendWarnning = false;
+ Alert alert = new Alert();
+
+ String cmdName = getCommandCnName(processInstance.getCommandType());
+ String success = processInstance.getState().typeIsSuccess() ?
"success" : "failed";
+ alert.setTitle(cmdName + " " + success);
+ String content = getContentProcessInstance(processInstance,
taskInstances,projectUser);
+ alert.setContent(content);
+ alert.setAlertGroupId(processInstance.getWarningGroupId());
+ alert.setCreateTime(new Date());
+ alertDao.addAlert(alert);
+ logger.info("add alert to db , alert: {}", alert);
+ }
+
+ /**
+ * check if need to be send warning
+ *
+ * @param processInstance
+ * @return
+ */
+ public boolean isNeedToSendWarning(ProcessInstance processInstance) {
+ if (Flag.YES == processInstance.getIsSubProcess()) {
+ return false;
+ }
+ boolean sendWarning = false;
WarningType warningType = processInstance.getWarningType();
switch (warningType) {
case ALL:
if (processInstance.getState().typeIsFinished()) {
- sendWarnning = true;
+ sendWarning = true;
}
break;
case SUCCESS:
if (processInstance.getState().typeIsSuccess()) {
- sendWarnning = true;
+ sendWarning = true;
}
break;
case FAILURE:
if (processInstance.getState().typeIsFailure()) {
- sendWarnning = true;
+ sendWarning = true;
}
break;
default:
}
- if (!sendWarnning) {
- return;
- }
- Alert alert = new Alert();
-
- String cmdName = getCommandCnName(processInstance.getCommandType());
- String success = processInstance.getState().typeIsSuccess() ?
"success" : "failed";
- alert.setTitle(cmdName + " " + success);
- String content = getContentProcessInstance(processInstance,
taskInstances,projectUser);
- alert.setContent(content);
- alert.setAlertGroupId(processInstance.getWarningGroupId());
- alert.setCreateTime(new Date());
- alertDao.addAlert(alert);
- logger.info("add alert to db , alert: {}", alert);
+ return sendWarning;
}
/**
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
similarity index 55%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
copy to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
index 21af4b3..09c5571 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
@@ -15,32 +15,10 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.mapper;
+package org.apache.dolphinscheduler.service.cache;
-import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
-
-import org.apache.ibatis.annotations.Param;
-
-import java.util.List;
-
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-
-/**
- * worker group mapper interface
- */
-public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
-
- /**
- * query all worker group
- * @return worker group list
- */
- List<WorkerGroup> queryAllWorkerGroup();
-
- /**
- * query worer grouop by name
- * @param name name
- * @return worker group list
- */
- List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
+import org.apache.dolphinscheduler.remote.command.Command;
+public interface CacheNotifyService {
+ void notifyMaster(Command command);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
similarity index 56%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
copy to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
index 21af4b3..2a03654 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
@@ -15,32 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.mapper;
+package org.apache.dolphinscheduler.service.cache.impl;
-import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import java.lang.reflect.Method;
-import org.apache.ibatis.annotations.Param;
-
-import java.util.List;
-
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.springframework.cache.interceptor.KeyGenerator;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
/**
- * worker group mapper interface
+ * custom cache key generator
*/
-public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
-
- /**
- * query all worker group
- * @return worker group list
- */
- List<WorkerGroup> queryAllWorkerGroup();
-
- /**
- * query worer grouop by name
- * @param name name
- * @return worker group list
- */
- List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
+@Component
+public class CacheKeyGenerator implements KeyGenerator {
+ @Override
+ public Object generate(Object target, Method method, Object... params) {
+ return StringUtils.arrayToDelimitedString(params, "_");
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
new file mode 100644
index 0000000..ffa9299
--- /dev/null
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.cache.impl;
+
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import io.netty.channel.Channel;
+
+/**
+ * cache notify service
+ */
+@Service
+public class CacheNotifyServiceImpl implements CacheNotifyService {
+
+ private final Logger logger =
LoggerFactory.getLogger(CacheNotifyServiceImpl.class);
+
+ @Autowired
+ private RegistryClient registryClient;
+
+ /**
+ * remote channels
+ */
+ private static final ConcurrentHashMap<Host, NettyRemoteChannel>
REMOTE_CHANNELS = new ConcurrentHashMap<>();
+
+ /**
+ * netty remoting client
+ */
+ private final NettyRemotingClient nettyRemotingClient;
+
+ public CacheNotifyServiceImpl() {
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ }
+
+ /**
+ * add channel
+ *
+ * @param channel channel
+ */
+ private void cache(Host host, NettyRemoteChannel channel) {
+ REMOTE_CHANNELS.put(host, channel);
+ }
+
+ /**
+ * remove channel
+ */
+ private void remove(Host host) {
+ REMOTE_CHANNELS.remove(host);
+ }
+
+ /**
+ * get remote channel
+ *
+ * @return netty remote channel
+ */
+ private NettyRemoteChannel getRemoteChannel(Host host) {
+ NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host);
+ if (nettyRemoteChannel != null) {
+ if (nettyRemoteChannel.isActive()) {
+ return nettyRemoteChannel;
+ } else {
+ this.remove(host);
+ }
+ }
+
+ Channel channel = nettyRemotingClient.getChannel(host);
+ if (channel == null) {
+ return null;
+ }
+
+ NettyRemoteChannel remoteChannel = new NettyRemoteChannel(channel);
+ this.cache(host, remoteChannel);
+ return remoteChannel;
+ }
+
+ /**
+ * send result to master
+ *
+ * @param command command
+ */
+ @Override
+ public void notifyMaster(Command command) {
+ logger.info("send result, command:{}", command.toString());
+ try {
+ List<Server> serverList =
registryClient.getServerList(NodeType.MASTER);
+ if (CollectionUtils.isEmpty(serverList)) {
+ return;
+ }
+
+ for (Server server : serverList) {
+ Host host = new Host(server.getHost(), server.getPort());
+ NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host);
+ if (nettyRemoteChannel == null) {
+ continue;
+ }
+ nettyRemoteChannel.writeAndFlush(command);
+ }
+ } catch (Exception e) {
+ logger.error("notify master error", e);
+ }
+ }
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 8215ed3..f421d88 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process;
-import static java.util.stream.Collectors.toSet;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -28,6 +27,8 @@ import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static java.util.stream.Collectors.toSet;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -212,8 +213,8 @@ public class ProcessService {
* @return process instance
*/
@Transactional
- public ProcessInstance handleCommand(Logger logger, String host, Command
command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
- ProcessInstance processInstance = constructProcessInstance(command,
host, processDefinitionCacheMaps);
+ public ProcessInstance handleCommand(Logger logger, String host, Command
command) {
+ ProcessInstance processInstance = constructProcessInstance(command,
host);
// cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}",
command);
@@ -731,19 +732,12 @@ public class ProcessService {
* @param host host
* @return process instance
*/
- private ProcessInstance constructProcessInstance(Command command, String
host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
+ private ProcessInstance constructProcessInstance(Command command, String
host) {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
- String key = String.format("%d-%d",
command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
- if (processDefinitionCacheMaps.containsKey(key)) {
- processDefinition = processDefinitionCacheMaps.get(key);
- } else {
- processDefinition =
this.findProcessDefinition(command.getProcessDefinitionCode(),
command.getProcessDefinitionVersion());
- if (processDefinition != null) {
- processDefinitionCacheMaps.put(key, processDefinition);
- }
- }
+
+ processDefinition =
this.findProcessDefinition(command.getProcessDefinitionCode(),
command.getProcessDefinitionVersion());
if (processDefinition == null) {
logger.error("cannot find the work process define! define code :
{}", command.getProcessDefinitionCode());
return null;
@@ -1952,7 +1946,7 @@ public class ProcessService {
if (Objects.isNull(user)) {
return StringUtils.EMPTY;
}
- Tenant tenant = tenantMapper.selectById(user.getTenantId());
+ Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (Objects.isNull(tenant)) {
return StringUtils.EMPTY;
}
@@ -2410,6 +2404,23 @@ public class ProcessService {
return
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
+ public List<TaskDefinitionLog>
getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
+ List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
+ Map<Long, Integer> taskCodeVersionMap = new HashMap<>();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (processTaskRelation.getPreTaskCode() > 0) {
+ taskCodeVersionMap.put(processTaskRelation.getPreTaskCode(),
processTaskRelation.getPreTaskVersion());
+ }
+ if (processTaskRelation.getPostTaskCode() > 0) {
+ taskCodeVersionMap.put(processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion());
+ }
+ }
+ taskCodeVersionMap.forEach((code,version) -> {
+ taskDefinitionLogs.add((TaskDefinitionLog)
this.findTaskDefinition(code, version));
+ });
+ return taskDefinitionLogs;
+ }
+
/**
* find task definition by code and version
*/
@@ -2491,7 +2502,7 @@ public class ProcessService {
return taskNodeList;
}
- public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId,
int taskId) {
+ public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId)
{
HashMap<ProcessInstance, TaskInstance> processTaskMap = new
HashMap<>();
//find sub tasks
ProcessInstanceMap processInstanceMap =
processInstanceMapMapper.queryBySubProcessId(processId);
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
new file mode 100644
index 0000000..a3dafb6
--- /dev/null
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.cache;
+
+import org.apache.dolphinscheduler.common.enums.CacheType;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.service.cache.impl.CacheNotifyServiceImpl;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * tenant cache proxy test
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class CacheNotifyServiceTest {
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @InjectMocks
+ private CacheNotifyServiceImpl cacheNotifyService;
+
+ @Mock
+ private RegistryClient registryClient;
+
+ @Test
+ public void testNotifyMaster() {
+ User user1 = new User();
+ user1.setId(100);
+ Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER,
"100").convert2Command();
+
+ NettyServerConfig serverConfig = new NettyServerConfig();
+
+ NettyRemotingServer nettyRemotingServer = new
NettyRemotingServer(serverConfig);
+ nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE,
(channel, command) -> {
+ Assert.assertEquals(cacheExpireCommand, command);
+ });
+ nettyRemotingServer.start();
+
+ List<Server> serverList = new ArrayList<>();
+ Server server = new Server();
+ server.setHost("127.0.0.1");
+ server.setPort(serverConfig.getListenPort());
+ serverList.add(server);
+
+
Mockito.when(registryClient.getServerList(NodeType.MASTER)).thenReturn(serverList);
+
+ cacheNotifyService.notifyMaster(cacheExpireCommand);
+
+ nettyRemotingServer.close();
+ }
+}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index da71460..fbc20b9a 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -119,8 +119,6 @@ public class ProcessServiceTest {
@Mock
private ResourceMapper resourceMapper;
- private HashMap<String, ProcessDefinition> processDefinitionCacheMaps =
new HashMap<>();
-
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();
@@ -247,7 +245,7 @@ public class ProcessServiceTest {
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING +
"\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
- Assert.assertNull(processService.handleCommand(logger, host, command,
processDefinitionCacheMaps));
+ Assert.assertNull(processService.handleCommand(logger, host, command));
int definitionVersion = 1;
long definitionCode = 123;
@@ -274,7 +272,7 @@ public class ProcessServiceTest {
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new
ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
- Assert.assertNotNull(processService.handleCommand(logger, host,
command1, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command1));
Command command2 = new Command();
command2.setId(2);
@@ -284,8 +282,7 @@ public class ProcessServiceTest {
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
-
- Assert.assertNotNull(processService.handleCommand(logger, host,
command2, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command2));
Command command3 = new Command();
command3.setId(3);
@@ -295,8 +292,7 @@ public class ProcessServiceTest {
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
-
- Assert.assertNotNull(processService.handleCommand(logger, host,
command3, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command3));
Command command4 = new Command();
command4.setId(4);
@@ -306,8 +302,7 @@ public class ProcessServiceTest {
command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
-
- Assert.assertNotNull(processService.handleCommand(logger, host,
command4, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host,
command4));
Command command5 = new Command();
command5.setId(5);
@@ -322,7 +317,7 @@ public class ProcessServiceTest {
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
- ProcessInstance processInstance1 =
processService.handleCommand(logger, host, command5,
processDefinitionCacheMaps);
+ ProcessInstance processInstance1 =
processService.handleCommand(logger, host, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
}
@@ -370,6 +365,9 @@ public class ProcessServiceTest {
processTaskRelationLog.setPostTaskCode(postTaskCode);
processTaskRelationLog.setPostTaskVersion(postTaskVersion);
relationLogList.add(processTaskRelationLog);
+
Mockito.when(processDefineMapper.queryByCode(parentProcessDefineCode)).thenReturn(processDefinition);
+
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode
+ , parentProcessDefineVersion)).thenReturn(relationLogList);
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
TaskDefinitionLog taskDefinitionLog1 = new TaskDefinitionLog();
diff --git
a/dolphinscheduler-standalone-server/src/main/resources/application-standalone.yaml
b/dolphinscheduler-standalone-server/src/main/resources/application-standalone.yaml
index 136a8c7..64f4724 100644
---
a/dolphinscheduler-standalone-server/src/main/resources/application-standalone.yaml
+++
b/dolphinscheduler-standalone-server/src/main/resources/application-standalone.yaml
@@ -18,6 +18,19 @@
spring:
application:
name: standalone-server
+ cache:
+ # default unable cache, you can enable by `type: caffeine`
+ type: none
+ cache-names:
+ - tenant
+ - user
+ - processDefinition
+ - processTaskRelation
+ - taskDefinition
+ - workerGroup
+ - schedule
+ caffeine:
+ spec: maximumSize=100,expireAfterWrite=300s,recordStats
server:
port: 12345
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index df30815..1c0214f 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -14,7 +14,9 @@ avro-1.7.4.jar
aws-java-sdk-1.7.4.jar
bonecp-0.8.0.RELEASE.jar
byte-buddy-1.9.16.jar
+caffeine-2.9.2.jar
checker-compat-qual-2.0.0.jar
+checker-qual-3.10.0.jar
classmate-1.5.1.jar
clickhouse-jdbc-0.1.52.jar
commons-email-1.5.jar
@@ -193,6 +195,7 @@ spring-boot-starter-json-2.5.6.jar
spring-boot-starter-logging-2.5.6.jar
spring-boot-starter-quartz-2.5.6.jar
spring-boot-starter-web-2.5.6.jar
+spring-boot-starter-cache-2.5.6.jar
spring-context-5.3.12.jar
spring-context-support-5.3.12.jar
spring-core-5.3.12.jar