This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b736b25dfca043e98c7f22690ea5d7b038ffbad2 Author: Shuai li <loney...@live.cn> AuthorDate: Wed Oct 12 19:54:52 2022 +0800 Fix secondstorage skipping index job error --- .../kap/secondstorage/SecondStorageIndexTest.java | 11 +++++----- .../management/SecondStorageService.java | 24 +++++++++++++++++----- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java index fbfe232328..5f561c7183 100644 --- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java +++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Set; import java.util.stream.Collectors; @@ -348,8 +349,8 @@ public class SecondStorageIndexTest implements JobWaiter { String jobId = updatePrimaryIndexAndSecondaryIndex(modelName, null, Sets.newHashSet()); waitJobEnd(getProject(), jobId); - assertThrows(MsgPicker.getMsg().getSecondStorageConcurrentOperate(), KylinException.class, - () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex)); + assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), getProject()), + KylinException.class, () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex)); clickhouse[0].start(); ClickHouseUtils.internalConfigClickHouse(clickhouse, replica); @@ -562,11 +563,9 @@ public class SecondStorageIndexTest implements JobWaiter { assertEquals(1, tableEntity.getSecondaryIndexColumns().size()); assertTrue(tableEntity.getSecondaryIndexColumns().contains(0)); - buildIncrementalLoadQuery("2012-01-02", "2012-01-03", - new HashSet<>( - NIndexPlanManager.getInstance(getConfig(), getProject()).getIndexPlan(modelId).getAllLayouts()), + buildIncrementalLoadQuery("2012-01-02", "2012-01-03", new HashSet<>(getIndexPlan(modelId).getAllLayouts()), modelId); - waitAllJobFinish(); + waitAllJoEnd(); for (TableData tableData : getTableFlow(modelId).getTableDataList()) { for (TablePartition partition : tableData.getPartitions()) { diff --git a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java index 9c4fb6c2fc..d10474b43e 100644 --- a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java +++ b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java @@ -326,7 +326,6 @@ public class SecondStorageService extends BasicService implements SecondStorageU deleteLayoutChTable(project, modelId, layout.getId()); EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { getTablePlan(project, modelId).update(tp -> tp.updatePrimaryIndexColumns(layout.getId(), columns)); - deleteLayoutChTable(project, modelId, layout.getId()); return null; }, project, 1, UnitOfWork.DEFAULT_EPOCH_ID); } @@ -370,11 +369,20 @@ public class SecondStorageService extends BasicService implements SecondStorageU } private void deleteLayoutChTable(String project, String modelId, long layoutId) { - String database = NameUtil.getDatabase(getConfig(), project); + KylinConfig config = getConfig(); + String database = NameUtil.getDatabase(config, project); String table = NameUtil.getTable(modelId, layoutId); - for (NodeGroup nodeGroup : SecondStorageUtil.listNodeGroup(getConfig(), project)) { - nodeGroup.getNodeNames().forEach(node -> SecondStorageFactoryUtils - .createDatabaseOperator(SecondStorageNodeHelper.resolve(node)).dropTable(database, table)); + for (NodeGroup nodeGroup : SecondStorageUtil.listNodeGroup(config, project)) { + nodeGroup.getNodeNames().forEach(node -> { + DatabaseOperator operator = SecondStorageFactoryUtils + .createDatabaseOperator(SecondStorageNodeHelper.resolve(node)); + try { + operator.dropTable(database, table); + } catch (Exception e) { + throw new KylinException(SECOND_STORAGE_NODE_NOT_AVAILABLE, + MsgPicker.getMsg().getSecondStorageNodeNotAvailable(node), e); + } + }); } } @@ -1149,6 +1157,12 @@ public class SecondStorageService extends BasicService implements SecondStorageU private void checkUpdateIndex(String project, String modelId) { SecondStorageUtil.validateProjectLock(project, Collections.singletonList(LockTypeEnum.LOAD.name())); List<AbstractExecutable> jobs = getRelationJobsWithoutFinish(project, modelId); + if (!jobs.isEmpty()) { + throw new KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS, + String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), project)); + } + jobs = getJobs(project, modelId, Sets.newHashSet(ExecutableState.ERROR), + Sets.newHashSet(JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES)); if (!jobs.isEmpty()) { throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS, MsgPicker.getMsg().getSecondStorageConcurrentOperate());