This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2e511129c96b8f6d3a816ae975de838958525b5d
Author: Pengfei Zhan <dethr...@gmail.com>
AuthorDate: Wed May 17 21:24:51 2023 +0800

    KYLIN-5633 [FOLLOW UP] heterogeneous segments pruning works for 
second-storage layouts
---
 .../routing/DataflowCapabilityCheckerTest.java     | 363 ++++++++++++++++++++-
 .../query/routing/DataflowCapabilityChecker.java   |   3 +-
 .../kylin/query/routing/QueryLayoutChooser.java    |  58 +++-
 3 files changed, 410 insertions(+), 14 deletions(-)

diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java
index 45d07cd629..561fa50380 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java
@@ -18,26 +18,46 @@
 
 package org.apache.kylin.query.routing;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableSet;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
 import org.apache.kylin.metadata.cube.cuboid.NLookupCandidate;
+import org.apache.kylin.metadata.cube.model.IndexEntity;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
+import org.apache.kylin.metadata.cube.model.NDataSegDetails;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
+import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.QueryableSeg;
 import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.util.MetadataTestUtils;
 import org.apache.kylin.util.OlapContextTestUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.clearspring.analytics.util.Lists;
+
 public class DataflowCapabilityCheckerTest extends NLocalWithSparkSessionTest {
 
+    private final long baseLayoutId = IndexEntity.TABLE_INDEX_START_ID + 1;
+    private final long nonBaseLayoutId = IndexEntity.TABLE_INDEX_START_ID + 
IndexEntity.INDEX_ID_STEP + 1;
+
     @Test
     public void testCapabilityResult() throws SqlParseException {
         NDataflow dataflow = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject())
@@ -52,6 +72,343 @@ public class DataflowCapabilityCheckerTest extends 
NLocalWithSparkSessionTest {
         Assert.assertEquals(result.getSelectedCandidate().getCost(), 
result.getCost(), 0.001);
     }
 
+    @Test
+    public void testChoosingBetweenHdfsAndSecondStorage() throws 
SqlParseException {
+        String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+        addTableIndexes(modelId);
+
+        {
+            String sql = "select a.NAME from TEST_BANK_INCOME a inner join 
TEST_BANK_LOCATION b \n"
+                    + " on a.COUNTRY = b.COUNTRY group by a.Name ";
+            NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+            OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0);
+            Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(),
+                    olapContext);
+            olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+            Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+            
candidate.getQueryableSeg().setBatchSegments(dataflow.getQueryableSegments());
+            CapabilityResult result = 
DataflowCapabilityChecker.check(dataflow, candidate, 
olapContext.getSQLDigest());
+            Assert.assertEquals(40001L, ((NLayoutCandidate) 
result.getSelectedCandidate()).getLayoutEntity().getId());
+        }
+
+        // without normal TableIndex, thus choose ch layout
+        {
+            MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.engine.segment-online-mode", "any");
+            String sql = "select a.NAME from TEST_BANK_INCOME a inner join 
TEST_BANK_LOCATION b \n"
+                    + " on a.COUNTRY = b.COUNTRY";
+            NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+            OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0);
+            Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(),
+                    olapContext);
+            olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+            Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+            QueryableSeg queryableSeg = candidate.getQueryableSeg();
+            Segments<NDataSegment> queryableSegments = 
dataflow.getQueryableSegments();
+            queryableSeg.setBatchSegments(queryableSegments);
+            Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap();
+            queryableSegments.forEach(seg -> {
+                List<Long> layoutIds = 
ImmutableList.of(IndexEntity.TABLE_INDEX_START_ID + 1);
+                chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet());
+                chSegToLayouts.get(seg.getId()).addAll(layoutIds);
+            });
+            queryableSeg.setChSegToLayoutsMap(chSegToLayouts);
+            CapabilityResult result = 
DataflowCapabilityChecker.check(dataflow, candidate, 
olapContext.getSQLDigest());
+            Assert.assertEquals(20000000001L,
+                    ((NLayoutCandidate) 
result.getSelectedCandidate()).getLayoutEntity().getId());
+        }
+    }
+
+    @Test
+    public void testWithHighIntegrityCandidateChoosingSecondStorage() throws 
SqlParseException {
+        String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+        addTableIndexes(modelId);
+        Set<String> missingSegIds = 
ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324");
+        clearAndAddDesiredTableIndex(modelId, missingSegIds, nonBaseLayoutId);
+
+        // ch segments aren't heterogeneous
+        {
+            MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.engine.segment-online-mode", "any");
+            MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.query.index-match-rules", "use-vacant-indexes");
+            String sql = "select a.NAME from TEST_BANK_INCOME a inner join 
TEST_BANK_LOCATION b \n"
+                    + " on a.COUNTRY = b.COUNTRY";
+            NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+            OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0);
+            Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(),
+                    olapContext);
+            olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+            Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+            QueryableSeg queryableSeg = candidate.getQueryableSeg();
+            Segments<NDataSegment> queryableSegments = 
dataflow.getQueryableSegments();
+            queryableSeg.setBatchSegments(queryableSegments);
+            Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap();
+            queryableSegments.forEach(seg -> {
+                List<Long> layoutIds = ImmutableList.of(baseLayoutId);
+                chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet());
+                chSegToLayouts.get(seg.getId()).addAll(layoutIds);
+            });
+            queryableSeg.setChSegToLayoutsMap(chSegToLayouts);
+            CapabilityResult result = 
DataflowCapabilityChecker.check(dataflow, candidate, 
olapContext.getSQLDigest());
+            Assert.assertEquals(20000000001L,
+                    ((NLayoutCandidate) 
result.getSelectedCandidate()).getLayoutEntity().getId());
+        }
+
+        // ch segments are heterogeneous
+        {
+            MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.engine.segment-online-mode", "any");
+            MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.query.index-match-rules", "use-vacant-indexes");
+            String sql = "select a.NAME from TEST_BANK_INCOME a inner join 
TEST_BANK_LOCATION b \n"
+                    + " on a.COUNTRY = b.COUNTRY";
+            NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+            OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0);
+            Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(),
+                    olapContext);
+            olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+            Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+            QueryableSeg queryableSeg = candidate.getQueryableSeg();
+            Segments<NDataSegment> queryableSegments = 
dataflow.getQueryableSegments();
+            queryableSeg.setBatchSegments(queryableSegments);
+            Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap();
+            Set<String> missingChSegIds = Sets.newHashSet();
+            missingChSegIds.add("d2edf0c5-5eb2-4968-9ad5-09efbf659324");
+            missingChSegIds.add("73570f31-05a5-448f-973c-44209830dd01");
+            queryableSegments.forEach(seg -> {
+                if (missingChSegIds.contains(seg.getId())) {
+                    return;
+                }
+                List<Long> layoutIds = ImmutableList.of(baseLayoutId);
+                chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet());
+                chSegToLayouts.get(seg.getId()).addAll(layoutIds);
+            });
+            queryableSeg.setChSegToLayoutsMap(chSegToLayouts);
+            CapabilityResult result = 
DataflowCapabilityChecker.check(dataflow, candidate, 
olapContext.getSQLDigest());
+            Assert.assertEquals(20000010001L,
+                    ((NLayoutCandidate) 
result.getSelectedCandidate()).getLayoutEntity().getId());
+        }
+    }
+
+    @Test
+    public void 
testWithHighIntegrityCandidateChoosingBaseLayoutNonSecondStorage() throws 
SqlParseException {
+        String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+        Set<String> missingSegIds = 
ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324");
+        addTableIndexes(modelId);
+        clearAndAddDesiredTableIndex(modelId, missingSegIds, baseLayoutId);
+
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.engine.segment-online-mode", "any");
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.query.index-match-rules", "use-vacant-indexes");
+        String sql = "select a.NAME from TEST_BANK_INCOME a inner join 
TEST_BANK_LOCATION b \n"
+                + " on a.COUNTRY = b.COUNTRY";
+        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0);
+        Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext);
+        olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+        Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+        QueryableSeg queryableSeg = candidate.getQueryableSeg();
+        Segments<NDataSegment> queryableSegments = 
dataflow.getQueryableSegments();
+        queryableSeg.setBatchSegments(queryableSegments);
+        Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap();
+        Set<String> missingChSegIds = Sets.newHashSet();
+        missingChSegIds.add("d2edf0c5-5eb2-4968-9ad5-09efbf659324");
+        missingChSegIds.add("73570f31-05a5-448f-973c-44209830dd01");
+        queryableSegments.forEach(seg -> {
+            if (missingChSegIds.contains(seg.getId())) {
+                return;
+            }
+            List<Long> layoutIds = ImmutableList.of(baseLayoutId);
+            chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet());
+            chSegToLayouts.get(seg.getId()).addAll(layoutIds);
+        });
+        queryableSeg.setChSegToLayoutsMap(chSegToLayouts);
+        CapabilityResult result = DataflowCapabilityChecker.check(dataflow, 
candidate, olapContext.getSQLDigest());
+        Assert.assertTrue(result.isPartialResult());
+        Assert.assertTrue(result.isCapable());
+        NLayoutCandidate selectedCandidate = (NLayoutCandidate) 
result.getSelectedCandidate();
+        Assert.assertEquals(259200000, selectedCandidate.getRange()); // from 
ch layout is 172800000
+        Assert.assertEquals(baseLayoutId, 
selectedCandidate.getLayoutEntity().getId());
+    }
+
+    @Test
+    public void 
testWithHighIntegrityCandidateChoosingBaseLayoutSecondStorage() throws 
SqlParseException {
+        String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+        Set<String> missingSegIds = 
ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324",
+                "73570f31-05a5-448f-973c-44209830dd01");
+        addTableIndexes(modelId);
+        clearAndAddDesiredTableIndex(modelId, missingSegIds, baseLayoutId);
+
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.engine.segment-online-mode", "any");
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.query.index-match-rules", "use-vacant-indexes");
+        String sql = "select a.NAME from TEST_BANK_INCOME a inner join 
TEST_BANK_LOCATION b \n"
+                + " on a.COUNTRY = b.COUNTRY";
+        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0);
+        Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext);
+        olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+        Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+        QueryableSeg queryableSeg = candidate.getQueryableSeg();
+        Segments<NDataSegment> queryableSegments = 
dataflow.getQueryableSegments();
+        queryableSeg.setBatchSegments(queryableSegments);
+        Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap();
+        Set<String> withoutChSegIdSet = Sets.newHashSet();
+        withoutChSegIdSet.add("d2edf0c5-5eb2-4968-9ad5-09efbf659324");
+        queryableSegments.forEach(seg -> {
+            if (withoutChSegIdSet.contains(seg.getId())) {
+                return;
+            }
+            List<Long> layoutIds = ImmutableList.of(baseLayoutId);
+            chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet());
+            chSegToLayouts.get(seg.getId()).addAll(layoutIds);
+        });
+        queryableSeg.setChSegToLayoutsMap(chSegToLayouts);
+        CapabilityResult result = DataflowCapabilityChecker.check(dataflow, 
candidate, olapContext.getSQLDigest());
+        Assert.assertTrue(result.isPartialResult());
+        Assert.assertTrue(result.isCapable());
+        NLayoutCandidate selectedCandidate = (NLayoutCandidate) 
result.getSelectedCandidate();
+        Assert.assertEquals(259200000, selectedCandidate.getRange()); // from 
normal layout is 172800000
+        Assert.assertEquals(baseLayoutId, 
selectedCandidate.getLayoutEntity().getId());
+    }
+
+    @Test
+    public void 
testWithHighIntegrityCandidateChoosingSecondStorageWithLargeSegEnd() throws 
SqlParseException {
+        String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+        Set<String> missingSegIds = 
ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324",
+                "ff839b0b-2c23-4420-b332-0df70e36c343");
+        addTableIndexes(modelId);
+        clearAndAddDesiredTableIndex(modelId, missingSegIds, baseLayoutId);
+
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.engine.segment-online-mode", "any");
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.query.index-match-rules", "use-vacant-indexes");
+        String sql = "select a.NAME from TEST_BANK_INCOME a inner join 
TEST_BANK_LOCATION b \n"
+                + " on a.COUNTRY = b.COUNTRY";
+        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0);
+        Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext);
+        olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+        Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+        QueryableSeg queryableSeg = candidate.getQueryableSeg();
+        Segments<NDataSegment> queryableSegments = 
dataflow.getQueryableSegments();
+        queryableSeg.setBatchSegments(queryableSegments);
+        Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap();
+        Set<String> withoutChSegIdSet = 
ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324",
+                "73570f31-05a5-448f-973c-44209830dd01");
+        queryableSegments.forEach(seg -> {
+            if (withoutChSegIdSet.contains(seg.getId())) {
+                return;
+            }
+            List<Long> layoutIds = ImmutableList.of(baseLayoutId);
+            chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet());
+            chSegToLayouts.get(seg.getId()).addAll(layoutIds);
+        });
+        queryableSeg.setChSegToLayoutsMap(chSegToLayouts);
+        CapabilityResult result = DataflowCapabilityChecker.check(dataflow, 
candidate, olapContext.getSQLDigest());
+        Assert.assertTrue(result.isPartialResult());
+        Assert.assertTrue(result.isCapable());
+        NLayoutCandidate selectedCandidate = (NLayoutCandidate) 
result.getSelectedCandidate();
+        Assert.assertEquals(172800000, selectedCandidate.getRange()); // from 
ch layout is 172800000
+        Assert.assertEquals(1604505600000L, selectedCandidate.getMaxSegEnd());
+        Assert.assertEquals(baseLayoutId, 
selectedCandidate.getLayoutEntity().getId());
+    }
+
+    @Test
+    public void 
testWithHighIntegrityCandidateChoosingNonSecondStorageWithLargeSegEnd() throws 
SqlParseException {
+        String modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+        Set<String> missingSegIds = 
ImmutableSet.of("0db919f3-1359-496c-aab5-b6f3951adc0e",
+                "d2edf0c5-5eb2-4968-9ad5-09efbf659324");
+        addTableIndexes(modelId);
+        clearAndAddDesiredTableIndex(modelId, missingSegIds, baseLayoutId);
+
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.engine.segment-online-mode", "any");
+        MetadataTestUtils.updateProjectConfig(getProject(), 
"kylin.query.index-match-rules", "use-vacant-indexes");
+        String sql = "select a.NAME from TEST_BANK_INCOME a inner join 
TEST_BANK_LOCATION b \n"
+                + " on a.COUNTRY = b.COUNTRY";
+        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), 
getProject()).getDataflow(modelId);
+        OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql, true).get(0);
+        Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext);
+        olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
+        Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
+        QueryableSeg queryableSeg = candidate.getQueryableSeg();
+        Segments<NDataSegment> queryableSegments = 
dataflow.getQueryableSegments();
+        queryableSeg.setBatchSegments(queryableSegments);
+        Map<String, Set<Long>> chSegToLayouts = Maps.newHashMap();
+        Set<String> missingChSegIds = 
ImmutableSet.of("d2edf0c5-5eb2-4968-9ad5-09efbf659324",
+                "ff839b0b-2c23-4420-b332-0df70e36c343");
+        queryableSegments.forEach(seg -> {
+            if (missingChSegIds.contains(seg.getId())) {
+                return;
+            }
+            List<Long> layoutIds = ImmutableList.of(baseLayoutId);
+            chSegToLayouts.putIfAbsent(seg.getId(), Sets.newHashSet());
+            chSegToLayouts.get(seg.getId()).addAll(layoutIds);
+        });
+        queryableSeg.setChSegToLayoutsMap(chSegToLayouts);
+        CapabilityResult result = DataflowCapabilityChecker.check(dataflow, 
candidate, olapContext.getSQLDigest());
+        Assert.assertTrue(result.isPartialResult());
+        Assert.assertTrue(result.isCapable());
+        NLayoutCandidate selectedCandidate = (NLayoutCandidate) 
result.getSelectedCandidate();
+        Assert.assertEquals(172800000L, selectedCandidate.getRange()); // from 
normal layout is 172800000
+        Assert.assertEquals(1604505600000L, selectedCandidate.getMaxSegEnd());
+        Assert.assertEquals(baseLayoutId, 
selectedCandidate.getLayoutEntity().getId());
+    }
+
+    private void addTableIndexes(String modelId) {
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            NIndexPlanManager indexMgr = 
NIndexPlanManager.getInstance(getTestConfig(), getProject());
+            indexMgr.updateIndexPlan(modelId, copyForWrite -> {
+                IndexEntity index = new IndexEntity();
+                index.setMeasures(Lists.newArrayList());
+                index.setDimensions(ImmutableList.of(0, 1, 2, 3));
+                index.setId(IndexEntity.TABLE_INDEX_START_ID);
+                LayoutEntity layout = new LayoutEntity();
+                layout.setId(index.getId() + 1);
+                layout.setIndex(index);
+                layout.setColOrder(ImmutableList.of(0, 1, 2, 3));
+                layout.setBase(true);
+                layout.setUpdateTime(System.currentTimeMillis());
+                index.setLayouts(ImmutableList.of(layout));
+                index.setIndexPlan(copyForWrite);
+                copyForWrite.getIndexes().add(index);
+
+                IndexEntity otherIndex = new IndexEntity();
+                otherIndex.setMeasures(Lists.newArrayList());
+                otherIndex.setDimensions(ImmutableList.of(0, 1, 2));
+                otherIndex.setId(IndexEntity.TABLE_INDEX_START_ID + 
IndexEntity.INDEX_ID_STEP);
+                LayoutEntity otherLayout = new LayoutEntity();
+                otherLayout.setId(otherIndex.getId() + 1);
+                otherLayout.setIndex(otherIndex);
+                otherLayout.setColOrder(ImmutableList.of(0, 1, 2));
+                otherLayout.setBase(true);
+                otherLayout.setUpdateTime(System.currentTimeMillis());
+                otherIndex.setLayouts(ImmutableList.of(otherLayout));
+                otherIndex.setIndexPlan(copyForWrite);
+                copyForWrite.getIndexes().add(otherIndex);
+            });
+            return null;
+        }, getProject());
+    }
+
+    private void clearAndAddDesiredTableIndex(String modelId, Set<String> 
missingSegIds, long desiredLayoutId) {
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            NDataflowManager dfMgr = 
NDataflowManager.getInstance(getTestConfig(), getProject());
+            NDataflow df = dfMgr.getDataflow(modelId);
+            List<NDataLayout> toAddLayouts = Lists.newArrayList();
+            List<NDataLayout> toRemoveLayouts = Lists.newArrayList();
+            for (NDataSegment segment : df.getSegments()) {
+                NDataSegDetails segDetails = segment.getSegDetails();
+                toRemoveLayouts.addAll(segDetails.getAllLayouts());
+                if (!missingSegIds.contains(segment.getId())) {
+                    NDataLayout layout = NDataLayout.newDataLayout(segDetails, 
desiredLayoutId);
+                    layout.setRows(100);
+                    toAddLayouts.add(layout);
+                }
+            }
+
+            // update
+            NDataflowUpdate dataflowUpdate = new NDataflowUpdate(modelId);
+            dataflowUpdate.setToRemoveLayouts(toRemoveLayouts.toArray(new 
NDataLayout[0]));
+            dataflowUpdate.setToAddOrUpdateLayouts(toAddLayouts.toArray(new 
NDataLayout[0]));
+            dfMgr.updateDataflow(dataflowUpdate);
+            return null;
+        }, getProject());
+    }
+
     @Test
     public void testLookupMatch() throws SqlParseException {
         NDataflow dataflow = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject())
@@ -65,7 +422,8 @@ public class DataflowCapabilityCheckerTest extends 
NLocalWithSparkSessionTest {
         {
             String sql = "select SITE_ID from EDW.TEST_SITES";
             OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql).get(0);
-            Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext);
+            Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(),
+                    olapContext);
             olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
             Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
             CapabilityResult result = 
DataflowCapabilityChecker.check(dataflow, candidate, 
olapContext.getSQLDigest());
@@ -79,7 +437,8 @@ public class DataflowCapabilityCheckerTest extends 
NLocalWithSparkSessionTest {
         {
             String sql = "select sum(SITE_ID) from EDW.TEST_SITES";
             OLAPContext olapContext = 
OlapContextTestUtil.getOlapContexts(getProject(), sql).get(0);
-            Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext);
+            Map<String, String> sqlAlias2ModelNameMap = 
OlapContextTestUtil.matchJoins(dataflow.getModel(),
+                    olapContext);
             olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap);
             Candidate candidate = new Candidate(dataflow, olapContext, 
sqlAlias2ModelNameMap);
             CapabilityResult result = 
DataflowCapabilityChecker.check(dataflow, candidate, 
olapContext.getSQLDigest());
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/DataflowCapabilityChecker.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/DataflowCapabilityChecker.java
index f099544160..d0e5640d76 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/DataflowCapabilityChecker.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/DataflowCapabilityChecker.java
@@ -84,8 +84,7 @@ public class DataflowCapabilityChecker {
                         digest, secondStorageSegmentLayoutMap);
             } else if (candidateAndInfluence == null) {
                 log.debug("select the layout candidate with high data 
integrity.");
-                candidateAndInfluence = 
QueryLayoutChooser.selectHighIntegrityCandidate(dataflow, prunedSegments,
-                        digest);
+                candidateAndInfluence = 
QueryLayoutChooser.selectHighIntegrityCandidate(dataflow, candidate, digest);
                 if (candidateAndInfluence != null) {
                     result.setPartialResult(true);
                 }
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryLayoutChooser.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryLayoutChooser.java
index 77fea3c143..3a0d29fa4e 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryLayoutChooser.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/QueryLayoutChooser.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -157,8 +158,9 @@ public class QueryLayoutChooser {
         return rangeAndLatest;
     }
 
-    public static NLayoutCandidate selectHighIntegrityCandidate(NDataflow 
dataflow, List<NDataSegment> prunedSegments,
+    public static NLayoutCandidate selectHighIntegrityCandidate(NDataflow 
dataflow, Candidate candidate,
             SQLDigest digest) {
+        List<NDataSegment> prunedSegments = 
candidate.getPrunedSegments(dataflow);
         if 
(!QueryRouter.isVacantIndexPruningEnabled(NProjectManager.getProjectConfig(dataflow.getProject())))
 {
             return null;
         }
@@ -172,14 +174,8 @@ public class QueryLayoutChooser {
             return null;
         }
 
-        Map<Long, List<NDataLayout>> idToDataLayoutsMap = Maps.newHashMap();
-        for (NDataSegment segment : prunedSegments) {
-            segment.getLayoutsMap().forEach((id, dataLayout) -> {
-                idToDataLayoutsMap.putIfAbsent(id, Lists.newArrayList());
-                idToDataLayoutsMap.get(id).add(dataLayout);
-            });
-        }
-
+        Map<Long, List<NDataLayout>> idToDataLayoutsMap = 
getCommonLayouts(dataflow, chooserContext, candidate,
+                prunedSegments);
         List<NLayoutCandidate> allLayoutCandidates = 
QueryLayoutChooser.collectAllLayoutCandidates(dataflow,
                 chooserContext, idToDataLayoutsMap);
         return chooseBestLayoutCandidate(dataflow, digest, chooserContext, 
allLayoutCandidates,
@@ -203,6 +199,47 @@ public class QueryLayoutChooser {
         return allLayoutCandidates.get(0);
     }
 
+    private static Map<Long, List<NDataLayout>> getCommonLayouts(NDataflow 
dataflow, ChooserContext chooserContext,
+            Candidate candidate, List<NDataSegment> prunedSegments) {
+        Map<Long, List<NDataLayout>> idToDataLayoutsMap = Maps.newHashMap();
+        for (NDataSegment segment : prunedSegments) {
+            segment.getLayoutsMap().forEach((id, dataLayout) -> {
+                idToDataLayoutsMap.putIfAbsent(id, Lists.newArrayList());
+                idToDataLayoutsMap.get(id).add(dataLayout);
+            });
+        }
+
+        KylinConfig projectConfig = 
NProjectManager.getProjectConfig(dataflow.getProject());
+        String segmentOnlineMode = 
projectConfig.getKylinEngineSegmentOnlineMode();
+        Map<String, Set<Long>> chSegToLayoutsMap = 
candidate.getChSegToLayoutsMap(dataflow);
+        if 
(SegmentOnlineMode.ANY.toString().equalsIgnoreCase(segmentOnlineMode)
+                && MapUtils.isNotEmpty(chSegToLayoutsMap)) {
+            Map<Long, List<NDataLayout>> chLayoutsMap = Maps.newHashMap();
+            chSegToLayoutsMap.forEach((segId, chLayoutIds) -> 
chLayoutIds.forEach(id -> {
+                NDataLayout dataLayout = NDataLayout.newDataLayout(dataflow, 
segId, id);
+                chLayoutsMap.putIfAbsent(id, Lists.newArrayList());
+                chLayoutsMap.get(id).add(dataLayout);
+            }));
+            chLayoutsMap.forEach((layoutId, chLayouts) -> {
+                if (!idToDataLayoutsMap.containsKey(layoutId)) {
+                    idToDataLayoutsMap.put(layoutId, chLayouts);
+                } else {
+                    List<NDataLayout> normalLayouts = 
idToDataLayoutsMap.get(layoutId);
+                    long[] normalRangeAndMax = 
calcSegRangeAndMaxEnd(chooserContext, dataflow, normalLayouts);
+                    long[] chRangeAndMax = 
calcSegRangeAndMaxEnd(chooserContext, dataflow, chLayouts);
+                    // CH Layouts is more preferred: 
+                    // 1. with bigger built data range
+                    // 2. have the latest build data
+                    if ((normalRangeAndMax[0] < chRangeAndMax[0])
+                            || (normalRangeAndMax[0] == chRangeAndMax[0] && 
normalRangeAndMax[1] <= chRangeAndMax[1])) {
+                        idToDataLayoutsMap.put(layoutId, chLayouts);
+                    }
+                }
+            });
+        }
+        return idToDataLayoutsMap;
+    }
+
     private static Collection<NDataLayout> getCommonLayouts(List<NDataSegment> 
segments, NDataflow dataflow,
             Map<String, Set<Long>> chSegmentToLayoutsMap) {
         KylinConfig projectConfig = 
NProjectManager.getProjectConfig(dataflow.getProject());
@@ -221,10 +258,11 @@ public class QueryLayoutChooser {
             var layoutIdMapToDataLayout = dataSegment.getLayoutsMap();
             if 
(SegmentOnlineMode.ANY.toString().equalsIgnoreCase(segmentOnlineMode)
                     && MapUtils.isNotEmpty(chSegmentToLayoutsMap)) {
+                // Only the basic TableIndex is built in the CH storage
                 Set<Long> chLayouts = 
chSegmentToLayoutsMap.getOrDefault(dataSegment.getId(), Sets.newHashSet());
                 Map<Long, NDataLayout> nDataLayoutMap = chLayouts.stream()
                         .map(id -> NDataLayout.newDataLayout(dataflow, 
dataSegment.getId(), id))
-                        .collect(Collectors.toMap(NDataLayout::getLayoutId, 
nDataLayout -> nDataLayout));
+                        .collect(Collectors.toMap(NDataLayout::getLayoutId, 
Function.identity()));
 
                 nDataLayoutMap.putAll(layoutIdMapToDataLayout);
                 layoutIdMapToDataLayout = nDataLayoutMap;

Reply via email to