This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2e511129c96b8f6d3a816ae975de838958525b5d Author: Pengfei Zhan <dethr...@gmail.com> AuthorDate: Wed May 17 21:24:51 2023 +0800 KYLIN-5633 [FOLLOW UP] heterogeneous segments pruning works for second-storage layouts --- .../routing/DataflowCapabilityCheckerTest.java | 363 ++++++++++++++++++++- .../query/routing/DataflowCapabilityChecker.java | 3 +- .../kylin/query/routing/QueryLayoutChooser.java | 58 +++- 3 files changed, 410 insertions(+), 14 deletions(-) diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java index 45d07cd629..561fa50380 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java @@ -18,26 +18,46 @@ package org.apache.kylin.query.routing; +import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableSet; import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; import org.apache.kylin.metadata.cube.cuboid.NLookupCandidate; +import org.apache.kylin.metadata.cube.model.IndexEntity; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NDataLayout; +import org.apache.kylin.metadata.cube.model.NDataSegDetails; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NDataflowUpdate; +import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.metadata.model.Segments; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.metadata.realization.QueryableSeg; import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.util.MetadataTestUtils; import org.apache.kylin.util.OlapContextTestUtil; import org.junit.Assert; import org.junit.Test; +import com.clearspring.analytics.util.Lists; + public class DataflowCapabilityCheckerTest extends NLocalWithSparkSessionTest { + private final long baseLayoutId = IndexEntity.TABLE_INDEX_START_ID + 1; + private final long nonBaseLayoutId = IndexEntity.TABLE_INDEX_START_ID + IndexEntity.INDEX_ID_STEP + 1; + @Test public void testCapabilityResult() throws SqlParseException { NDataflow dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject()) @@ -52,6 +72,343 @@ public class DataflowCapabilityCheckerTest extends NLocalWithSparkSessionTest { Assert.assertEquals(result.getSelectedCandidate().getCost(), result.getCost(), 0.001); } + @Test + public void testChoosingBetweenHdfsAndSecondStorage() throws SqlParseException { + String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6"; + addTableIndexes(modelId); + + { + String sql = "select a.NAME from TEST_BANK_INCOME a inner join TEST_BANK_LOCATION b \n" + + " on a.COUNTRY = b.COUNTRY group by a.Name "; + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), + olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + candidate.getQueryableSeg().setBatchSegments(dataflow.getQueryableSegments()); + CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); + Assert.assertEquals(40001L, ((NLayoutCandidate) result.getSelectedCandidate()).getLayoutEntity().getId()); + } + + // without normal TableIndex, thus choose ch layout + { + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.engine.segment-online-mode", "any"); + String sql = "select a.NAME from TEST_BANK_INCOME a inner join TEST_BANK_LOCATION b \n" + + " on a.COUNTRY = b.COUNTRY"; + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), + olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + QueryableSeg queryableSeg = candidate.getQueryableSeg(); + Segments<NDataSegment> queryableSegments = dataflow.getQueryableSegments(); + queryableSeg.setBatchSegments(queryableSegments); + Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap(); + queryableSegments.forEach(seg -> { + List<Long> layoutIds = ImmutableList.of(IndexEntity.TABLE_INDEX_START_ID + 1); + chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet()); + chSegToLayouts.get(seg.getId()).addAll(layoutIds); + }); + queryableSeg.setChSegToLayoutsMap(chSegToLayouts); + CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); + Assert.assertEquals(20000000001L, + ((NLayoutCandidate) result.getSelectedCandidate()).getLayoutEntity().getId()); + } + } + + @Test + public void testWithHighIntegrityCandidateChoosingSecondStorage() throws SqlParseException { + String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6"; + addTableIndexes(modelId); + Set<String> missingSegIds = ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324"); + clearAndAddDesiredTableIndex(modelId, missingSegIds, nonBaseLayoutId); + + // ch segments aren't heterogeneous + { + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.engine.segment-online-mode", "any"); + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.query.index-match-rules", "use-vacant-indexes"); + String sql = "select a.NAME from TEST_BANK_INCOME a inner join TEST_BANK_LOCATION b \n" + + " on a.COUNTRY = b.COUNTRY"; + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), + olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + QueryableSeg queryableSeg = candidate.getQueryableSeg(); + Segments<NDataSegment> queryableSegments = dataflow.getQueryableSegments(); + queryableSeg.setBatchSegments(queryableSegments); + Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap(); + queryableSegments.forEach(seg -> { + List<Long> layoutIds = ImmutableList.of(baseLayoutId); + chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet()); + chSegToLayouts.get(seg.getId()).addAll(layoutIds); + }); + queryableSeg.setChSegToLayoutsMap(chSegToLayouts); + CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); + Assert.assertEquals(20000000001L, + ((NLayoutCandidate) result.getSelectedCandidate()).getLayoutEntity().getId()); + } + + // ch segments are heterogeneous + { + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.engine.segment-online-mode", "any"); + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.query.index-match-rules", "use-vacant-indexes"); + String sql = "select a.NAME from TEST_BANK_INCOME a inner join TEST_BANK_LOCATION b \n" + + " on a.COUNTRY = b.COUNTRY"; + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), + olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + QueryableSeg queryableSeg = candidate.getQueryableSeg(); + Segments<NDataSegment> queryableSegments = dataflow.getQueryableSegments(); + queryableSeg.setBatchSegments(queryableSegments); + Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap(); + Set<String> missingChSegIds = Sets.newHashSet(); + missingChSegIds.add("d2edf0c5-5eb2-4968-9ad5-09efbf659324"); + missingChSegIds.add("73570f31-05a5-448f-973c-44209830dd01"); + queryableSegments.forEach(seg -> { + if (missingChSegIds.contains(seg.getId())) { + return; + } + List<Long> layoutIds = ImmutableList.of(baseLayoutId); + chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet()); + chSegToLayouts.get(seg.getId()).addAll(layoutIds); + }); + queryableSeg.setChSegToLayoutsMap(chSegToLayouts); + CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); + Assert.assertEquals(20000010001L, + ((NLayoutCandidate) result.getSelectedCandidate()).getLayoutEntity().getId()); + } + } + + @Test + public void testWithHighIntegrityCandidateChoosingBaseLayoutNonSecondStorage() throws SqlParseException { + String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6"; + Set<String> missingSegIds = ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324"); + addTableIndexes(modelId); + clearAndAddDesiredTableIndex(modelId, missingSegIds, baseLayoutId); + + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.engine.segment-online-mode", "any"); + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.query.index-match-rules", "use-vacant-indexes"); + String sql = "select a.NAME from TEST_BANK_INCOME a inner join TEST_BANK_LOCATION b \n" + + " on a.COUNTRY = b.COUNTRY"; + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + QueryableSeg queryableSeg = candidate.getQueryableSeg(); + Segments<NDataSegment> queryableSegments = dataflow.getQueryableSegments(); + queryableSeg.setBatchSegments(queryableSegments); + Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap(); + Set<String> missingChSegIds = Sets.newHashSet(); + missingChSegIds.add("d2edf0c5-5eb2-4968-9ad5-09efbf659324"); + missingChSegIds.add("73570f31-05a5-448f-973c-44209830dd01"); + queryableSegments.forEach(seg -> { + if (missingChSegIds.contains(seg.getId())) { + return; + } + List<Long> layoutIds = ImmutableList.of(baseLayoutId); + chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet()); + chSegToLayouts.get(seg.getId()).addAll(layoutIds); + }); + queryableSeg.setChSegToLayoutsMap(chSegToLayouts); + CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); + Assert.assertTrue(result.isPartialResult()); + Assert.assertTrue(result.isCapable()); + NLayoutCandidate selectedCandidate = (NLayoutCandidate) result.getSelectedCandidate(); + Assert.assertEquals(259200000, selectedCandidate.getRange()); // from ch layout is 172800000 + Assert.assertEquals(baseLayoutId, selectedCandidate.getLayoutEntity().getId()); + } + + @Test + public void testWithHighIntegrityCandidateChoosingBaseLayoutSecondStorage() throws SqlParseException { + String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6"; + Set<String> missingSegIds = ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324", + "73570f31-05a5-448f-973c-44209830dd01"); + addTableIndexes(modelId); + clearAndAddDesiredTableIndex(modelId, missingSegIds, baseLayoutId); + + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.engine.segment-online-mode", "any"); + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.query.index-match-rules", "use-vacant-indexes"); + String sql = "select a.NAME from TEST_BANK_INCOME a inner join TEST_BANK_LOCATION b \n" + + " on a.COUNTRY = b.COUNTRY"; + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + QueryableSeg queryableSeg = candidate.getQueryableSeg(); + Segments<NDataSegment> queryableSegments = dataflow.getQueryableSegments(); + queryableSeg.setBatchSegments(queryableSegments); + Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap(); + Set<String> withoutChSegIdSet = Sets.newHashSet(); + withoutChSegIdSet.add("d2edf0c5-5eb2-4968-9ad5-09efbf659324"); + queryableSegments.forEach(seg -> { + if (withoutChSegIdSet.contains(seg.getId())) { + return; + } + List<Long> layoutIds = ImmutableList.of(baseLayoutId); + chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet()); + chSegToLayouts.get(seg.getId()).addAll(layoutIds); + }); + queryableSeg.setChSegToLayoutsMap(chSegToLayouts); + CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); + Assert.assertTrue(result.isPartialResult()); + Assert.assertTrue(result.isCapable()); + NLayoutCandidate selectedCandidate = (NLayoutCandidate) result.getSelectedCandidate(); + Assert.assertEquals(259200000, selectedCandidate.getRange()); // from normal layout is 172800000 + Assert.assertEquals(baseLayoutId, selectedCandidate.getLayoutEntity().getId()); + } + + @Test + public void testWithHighIntegrityCandidateChoosingSecondStorageWithLargeSegEnd() throws SqlParseException { + String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6"; + Set<String> missingSegIds = ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324", + "ff839b0b-2c23-4420-b332-0df70e36c343"); + addTableIndexes(modelId); + clearAndAddDesiredTableIndex(modelId, missingSegIds, baseLayoutId); + + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.engine.segment-online-mode", "any"); + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.query.index-match-rules", "use-vacant-indexes"); + String sql = "select a.NAME from TEST_BANK_INCOME a inner join TEST_BANK_LOCATION b \n" + + " on a.COUNTRY = b.COUNTRY"; + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + QueryableSeg queryableSeg = candidate.getQueryableSeg(); + Segments<NDataSegment> queryableSegments = dataflow.getQueryableSegments(); + queryableSeg.setBatchSegments(queryableSegments); + Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap(); + Set<String> withoutChSegIdSet = ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324", + "73570f31-05a5-448f-973c-44209830dd01"); + queryableSegments.forEach(seg -> { + if (withoutChSegIdSet.contains(seg.getId())) { + return; + } + List<Long> layoutIds = ImmutableList.of(baseLayoutId); + chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet()); + chSegToLayouts.get(seg.getId()).addAll(layoutIds); + }); + queryableSeg.setChSegToLayoutsMap(chSegToLayouts); + CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); + Assert.assertTrue(result.isPartialResult()); + Assert.assertTrue(result.isCapable()); + NLayoutCandidate selectedCandidate = (NLayoutCandidate) result.getSelectedCandidate(); + Assert.assertEquals(172800000, selectedCandidate.getRange()); // from ch layout is 172800000 + Assert.assertEquals(1604505600000L, selectedCandidate.getMaxSegEnd()); + Assert.assertEquals(baseLayoutId, selectedCandidate.getLayoutEntity().getId()); + } + + @Test + public void testWithHighIntegrityCandidateChoosingNonSecondStorageWithLargeSegEnd() throws SqlParseException { + String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6"; + Set<String> missingSegIds = ImmutableSet.of("0db919f3-1359-496c-aab5-b6f3951adc0e", + "d2edf0c5-5eb2-4968-9ad5-09efbf659324"); + addTableIndexes(modelId); + clearAndAddDesiredTableIndex(modelId, missingSegIds, baseLayoutId); + + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.engine.segment-online-mode", "any"); + MetadataTestUtils.updateProjectConfig(getProject(), "kylin.query.index-match-rules", "use-vacant-indexes"); + String sql = "select a.NAME from TEST_BANK_INCOME a inner join TEST_BANK_LOCATION b \n" + + " on a.COUNTRY = b.COUNTRY"; + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + QueryableSeg queryableSeg = candidate.getQueryableSeg(); + Segments<NDataSegment> queryableSegments = dataflow.getQueryableSegments(); + queryableSeg.setBatchSegments(queryableSegments); + Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap(); + Set<String> missingChSegIds = ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324", + "ff839b0b-2c23-4420-b332-0df70e36c343"); + queryableSegments.forEach(seg -> { + if (missingChSegIds.contains(seg.getId())) { + return; + } + List<Long> layoutIds = ImmutableList.of(baseLayoutId); + chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet()); + chSegToLayouts.get(seg.getId()).addAll(layoutIds); + }); + queryableSeg.setChSegToLayoutsMap(chSegToLayouts); + CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); + Assert.assertTrue(result.isPartialResult()); + Assert.assertTrue(result.isCapable()); + NLayoutCandidate selectedCandidate = (NLayoutCandidate) result.getSelectedCandidate(); + Assert.assertEquals(172800000L, selectedCandidate.getRange()); // from normal layout is 172800000 + Assert.assertEquals(1604505600000L, selectedCandidate.getMaxSegEnd()); + Assert.assertEquals(baseLayoutId, selectedCandidate.getLayoutEntity().getId()); + } + + private void addTableIndexes(String modelId) { + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + NIndexPlanManager indexMgr = NIndexPlanManager.getInstance(getTestConfig(), getProject()); + indexMgr.updateIndexPlan(modelId, copyForWrite -> { + IndexEntity index = new IndexEntity(); + index.setMeasures(Lists.newArrayList()); + index.setDimensions(ImmutableList.of(0, 1, 2, 3)); + index.setId(IndexEntity.TABLE_INDEX_START_ID); + LayoutEntity layout = new LayoutEntity(); + layout.setId(index.getId() + 1); + layout.setIndex(index); + layout.setColOrder(ImmutableList.of(0, 1, 2, 3)); + layout.setBase(true); + layout.setUpdateTime(System.currentTimeMillis()); + index.setLayouts(ImmutableList.of(layout)); + index.setIndexPlan(copyForWrite); + copyForWrite.getIndexes().add(index); + + IndexEntity otherIndex = new IndexEntity(); + otherIndex.setMeasures(Lists.newArrayList()); + otherIndex.setDimensions(ImmutableList.of(0, 1, 2)); + otherIndex.setId(IndexEntity.TABLE_INDEX_START_ID + IndexEntity.INDEX_ID_STEP); + LayoutEntity otherLayout = new LayoutEntity(); + otherLayout.setId(otherIndex.getId() + 1); + otherLayout.setIndex(otherIndex); + otherLayout.setColOrder(ImmutableList.of(0, 1, 2)); + otherLayout.setBase(true); + otherLayout.setUpdateTime(System.currentTimeMillis()); + otherIndex.setLayouts(ImmutableList.of(otherLayout)); + otherIndex.setIndexPlan(copyForWrite); + copyForWrite.getIndexes().add(otherIndex); + }); + return null; + }, getProject()); + } + + private void clearAndAddDesiredTableIndex(String modelId, Set<String> missingSegIds, long desiredLayoutId) { + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + NDataflowManager dfMgr = NDataflowManager.getInstance(getTestConfig(), getProject()); + NDataflow df = dfMgr.getDataflow(modelId); + List<NDataLayout> toAddLayouts = Lists.newArrayList(); + List<NDataLayout> toRemoveLayouts = Lists.newArrayList(); + for (NDataSegment segment : df.getSegments()) { + NDataSegDetails segDetails = segment.getSegDetails(); + toRemoveLayouts.addAll(segDetails.getAllLayouts()); + if (!missingSegIds.contains(segment.getId())) { + NDataLayout layout = NDataLayout.newDataLayout(segDetails, desiredLayoutId); + layout.setRows(100); + toAddLayouts.add(layout); + } + } + + // update + NDataflowUpdate dataflowUpdate = new NDataflowUpdate(modelId); + dataflowUpdate.setToRemoveLayouts(toRemoveLayouts.toArray(new NDataLayout[0])); + dataflowUpdate.setToAddOrUpdateLayouts(toAddLayouts.toArray(new NDataLayout[0])); + dfMgr.updateDataflow(dataflowUpdate); + return null; + }, getProject()); + } + @Test public void testLookupMatch() throws SqlParseException { NDataflow dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject()) @@ -65,7 +422,8 @@ public class DataflowCapabilityCheckerTest extends NLocalWithSparkSessionTest { { String sql = "select SITE_ID from EDW.TEST_SITES"; OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql).get(0); - Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), + olapContext); olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); @@ -79,7 +437,8 @@ public class DataflowCapabilityCheckerTest extends NLocalWithSparkSessionTest { { String sql = "select sum(SITE_ID) from EDW.TEST_SITES"; OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(getProject(), sql).get(0); - Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), + olapContext); olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); CapabilityResult result = DataflowCapabilityChecker.check(dataflow, candidate, olapContext.getSQLDigest()); diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/DataflowCapabilityChecker.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/DataflowCapabilityChecker.java index f099544160..d0e5640d76 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/DataflowCapabilityChecker.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/DataflowCapabilityChecker.java @@ -84,8 +84,7 @@ public class DataflowCapabilityChecker { digest, secondStorageSegmentLayoutMap); } else if (candidateAndInfluence == null) { log.debug("select the layout candidate with high data integrity."); - candidateAndInfluence = QueryLayoutChooser.selectHighIntegrityCandidate(dataflow, prunedSegments, - digest); + candidateAndInfluence = QueryLayoutChooser.selectHighIntegrityCandidate(dataflow, candidate, digest); if (candidateAndInfluence != null) { result.setPartialResult(true); } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryLayoutChooser.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryLayoutChooser.java index 77fea3c143..3a0d29fa4e 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryLayoutChooser.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryLayoutChooser.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; @@ -157,8 +158,9 @@ public class QueryLayoutChooser { return rangeAndLatest; } - public static NLayoutCandidate selectHighIntegrityCandidate(NDataflow dataflow, List<NDataSegment> prunedSegments, + public static NLayoutCandidate selectHighIntegrityCandidate(NDataflow dataflow, Candidate candidate, SQLDigest digest) { + List<NDataSegment> prunedSegments = candidate.getPrunedSegments(dataflow); if (!QueryRouter.isVacantIndexPruningEnabled(NProjectManager.getProjectConfig(dataflow.getProject()))) { return null; } @@ -172,14 +174,8 @@ public class QueryLayoutChooser { return null; } - Map<Long, List<NDataLayout>> idToDataLayoutsMap = Maps.newHashMap(); - for (NDataSegment segment : prunedSegments) { - segment.getLayoutsMap().forEach((id, dataLayout) -> { - idToDataLayoutsMap.putIfAbsent(id, Lists.newArrayList()); - idToDataLayoutsMap.get(id).add(dataLayout); - }); - } - + Map<Long, List<NDataLayout>> idToDataLayoutsMap = getCommonLayouts(dataflow, chooserContext, candidate, + prunedSegments); List<NLayoutCandidate> allLayoutCandidates = QueryLayoutChooser.collectAllLayoutCandidates(dataflow, chooserContext, idToDataLayoutsMap); return chooseBestLayoutCandidate(dataflow, digest, chooserContext, allLayoutCandidates, @@ -203,6 +199,47 @@ public class QueryLayoutChooser { return allLayoutCandidates.get(0); } + private static Map<Long, List<NDataLayout>> getCommonLayouts(NDataflow dataflow, ChooserContext chooserContext, + Candidate candidate, List<NDataSegment> prunedSegments) { + Map<Long, List<NDataLayout>> idToDataLayoutsMap = Maps.newHashMap(); + for (NDataSegment segment : prunedSegments) { + segment.getLayoutsMap().forEach((id, dataLayout) -> { + idToDataLayoutsMap.putIfAbsent(id, Lists.newArrayList()); + idToDataLayoutsMap.get(id).add(dataLayout); + }); + } + + KylinConfig projectConfig = NProjectManager.getProjectConfig(dataflow.getProject()); + String segmentOnlineMode = projectConfig.getKylinEngineSegmentOnlineMode(); + Map<String, Set<Long>> chSegToLayoutsMap = candidate.getChSegToLayoutsMap(dataflow); + if (SegmentOnlineMode.ANY.toString().equalsIgnoreCase(segmentOnlineMode) + && MapUtils.isNotEmpty(chSegToLayoutsMap)) { + Map<Long, List<NDataLayout>> chLayoutsMap = Maps.newHashMap(); + chSegToLayoutsMap.forEach((segId, chLayoutIds) -> chLayoutIds.forEach(id -> { + NDataLayout dataLayout = NDataLayout.newDataLayout(dataflow, segId, id); + chLayoutsMap.putIfAbsent(id, Lists.newArrayList()); + chLayoutsMap.get(id).add(dataLayout); + })); + chLayoutsMap.forEach((layoutId, chLayouts) -> { + if (!idToDataLayoutsMap.containsKey(layoutId)) { + idToDataLayoutsMap.put(layoutId, chLayouts); + } else { + List<NDataLayout> normalLayouts = idToDataLayoutsMap.get(layoutId); + long[] normalRangeAndMax = calcSegRangeAndMaxEnd(chooserContext, dataflow, normalLayouts); + long[] chRangeAndMax = calcSegRangeAndMaxEnd(chooserContext, dataflow, chLayouts); + // CH Layouts is more preferred: + // 1. with bigger built data range + // 2. have the latest build data + if ((normalRangeAndMax[0] < chRangeAndMax[0]) + || (normalRangeAndMax[0] == chRangeAndMax[0] && normalRangeAndMax[1] <= chRangeAndMax[1])) { + idToDataLayoutsMap.put(layoutId, chLayouts); + } + } + }); + } + return idToDataLayoutsMap; + } + private static Collection<NDataLayout> getCommonLayouts(List<NDataSegment> segments, NDataflow dataflow, Map<String, Set<Long>> chSegmentToLayoutsMap) { KylinConfig projectConfig = NProjectManager.getProjectConfig(dataflow.getProject()); @@ -221,10 +258,11 @@ public class QueryLayoutChooser { var layoutIdMapToDataLayout = dataSegment.getLayoutsMap(); if (SegmentOnlineMode.ANY.toString().equalsIgnoreCase(segmentOnlineMode) && MapUtils.isNotEmpty(chSegmentToLayoutsMap)) { + // Only the basic TableIndex is built in the CH storage Set<Long> chLayouts = chSegmentToLayoutsMap.getOrDefault(dataSegment.getId(), Sets.newHashSet()); Map<Long, NDataLayout> nDataLayoutMap = chLayouts.stream() .map(id -> NDataLayout.newDataLayout(dataflow, dataSegment.getId(), id)) - .collect(Collectors.toMap(NDataLayout::getLayoutId, nDataLayout -> nDataLayout)); + .collect(Collectors.toMap(NDataLayout::getLayoutId, Function.identity())); nDataLayoutMap.putAll(layoutIdMapToDataLayout); layoutIdMapToDataLayout = nDataLayoutMap;