This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e804eee81eff19424ea61c6899c226ba3d5fcf59
Author: Pengfei Zhan <dethr...@gmail.com>
AuthorDate: Sun Oct 1 17:19:17 2023 +0800

    KYLIN-5844 & KYLIN-5843 Column ACL works for PushDown
    
    1. Column ACL works for PushDown
    2. Flat-table sql should consider acl
---
 .../apache/kylin/metadata/model/ColumnDesc.java    |   5 +-
 .../apache/kylin/rest/util/AclPermissionUtil.java  |   2 +-
 .../kylin/rest/response/ExecutableResponse.java    |   4 +-
 .../routing/DataflowCapabilityCheckerTest.java     |   3 +-
 .../kylin/query/rules/AggPushdownRuleTest.java     |   4 +-
 .../kylin/query/rules/CalciteRuleTestBase.java     |  12 +-
 .../rest/response/LayoutRecDetailResponse.java     |   2 +-
 .../apache/kylin/rest/service/ModelService.java    |  10 +-
 .../rest/service/params/BasicSegmentParams.java    |   3 +-
 .../security/HackSelectStarWithColumnACL.java      | 259 +++++++------
 .../org/apache/kylin/query/util/PushDownUtil.java  |  32 +-
 .../security/HackSelectStarWithColumnACLTest.java  | 405 ++++++++++++++-------
 .../apache/kylin/query/util/PushDownUtilTest.java  | 359 ++++++++++--------
 .../merger/AfterMergeOrRefreshResourceMerger.java  |   3 +-
 .../spark/source/NSparkMetadataExplorer.java       |   3 +-
 .../engine/spark/job/NSparkMergingJobTest.java     |  10 +-
 16 files changed, 705 insertions(+), 411 deletions(-)

diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index c2efaccd4e..1754104ae4 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import org.apache.calcite.avatica.util.Quoting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.StringHelper;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
 import org.apache.kylin.metadata.datatype.DataType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -30,7 +31,6 @@ import 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSetter;
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -231,8 +231,7 @@ public class ColumnDesc implements Serializable {
     // for test mainly
     public static ColumnDesc mockup(TableDesc table, int oneBasedColumnIndex, 
String name, String datatype) {
         ColumnDesc desc = new ColumnDesc();
-        String id = "" + oneBasedColumnIndex;
-        desc.setId(id);
+        desc.setId(String.valueOf(oneBasedColumnIndex));
         desc.setName(name);
         desc.setDatatype(datatype);
         desc.init(table);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/rest/util/AclPermissionUtil.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/rest/util/AclPermissionUtil.java
index 36360a0f12..82aaca58dc 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/rest/util/AclPermissionUtil.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/rest/util/AclPermissionUtil.java
@@ -235,7 +235,7 @@ public class AclPermissionUtil {
     }
 
     public static QueryContext.AclInfo createAclInfo(String project, 
Set<String> groups) {
-        return new QueryContext.AclInfo(getCurrentUsername(), groups, 
isAdminInProject(project, groups));
+        return new QueryContext.AclInfo(getCurrentUsername(), groups, 
hasProjectAdminPermission(project, groups));
     }
 
     public static boolean hasExtPermission(Permission permission) {
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
index 70144b83a2..6736c2bb74 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/response/ExecutableResponse.java
@@ -27,6 +27,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.engine.spark.job.NSparkSnapshotJob;
 import org.apache.kylin.engine.spark.job.NTableSamplingJob;
 import org.apache.kylin.job.SecondStorageCleanJobUtil;
@@ -43,11 +45,9 @@ import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay;
 import org.apache.kylin.metadata.model.TableDesc;
 
-import com.clearspring.analytics.util.Lists;
 import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonUnwrapped;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java
index 561fa50380..1865372e8e 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/DataflowCapabilityCheckerTest.java
@@ -27,6 +27,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
 import org.apache.kylin.guava30.shaded.common.collect.ImmutableSet;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
@@ -51,8 +52,6 @@ import org.apache.kylin.util.OlapContextTestUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.clearspring.analytics.util.Lists;
-
 public class DataflowCapabilityCheckerTest extends NLocalWithSparkSessionTest {
 
     private final long baseLayoutId = IndexEntity.TABLE_INDEX_START_ID + 1;
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/rules/AggPushdownRuleTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/rules/AggPushdownRuleTest.java
index dfe7f9fca3..eec9bb41bf 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/rules/AggPushdownRuleTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/rules/AggPushdownRuleTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.kylin.query.rules;
 
 import java.io.IOException;
@@ -28,12 +27,11 @@ import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.test.DiffRepository;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.clearspring.analytics.util.Lists;
-
 import io.kyligence.kap.query.optrule.KapAggFilterTransposeRule;
 import io.kyligence.kap.query.optrule.KapAggJoinTransposeRule;
 import io.kyligence.kap.query.optrule.KapAggProjectMergeRule;
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
index 01a96c118f..f152f903b4 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
@@ -20,7 +20,6 @@ package org.apache.kylin.query.rules;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -42,6 +41,8 @@ import org.apache.commons.lang3.NotImplementedException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.guava30.shaded.common.base.Strings;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.engine.QueryOptimizer;
 import org.apache.kylin.query.util.HepUtils;
@@ -53,9 +54,6 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.clearspring.analytics.util.Lists;
-import org.apache.kylin.guava30.shaded.common.base.Strings;
-
 public class CalciteRuleTestBase extends NLocalFileMetadataTestCase {
     private static final Logger logger = 
LoggerFactory.getLogger(CalciteRuleTestBase.class);
 
@@ -183,13 +181,13 @@ public class CalciteRuleTestBase extends 
NLocalFileMetadataTestCase {
     }
 
     protected void checkPlanning(RelNode relBefore, RelNode relAfter, String 
prefix, boolean unchanged) {
-        assertThat(relBefore, notNullValue());
-        assertThat(relAfter, notNullValue());
+        Assert.assertThat(relBefore, notNullValue());
+        Assert.assertThat(relAfter, notNullValue());
         final String planBefore = NL + RelOptUtil.toString(relBefore);
         final String planAfter = NL + RelOptUtil.toString(relAfter);
 
         if (unchanged) {
-            assertThat(planAfter, is(planBefore));
+            Assert.assertThat(planAfter, is(planBefore));
         } else {
             checkDiff(relBefore, relAfter, prefix);
         }
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/LayoutRecDetailResponse.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/LayoutRecDetailResponse.java
index 7b8959e374..bfcbfc964b 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/LayoutRecDetailResponse.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/LayoutRecDetailResponse.java
@@ -20,10 +20,10 @@ package org.apache.kylin.rest.response;
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 
-import com.clearspring.analytics.util.Lists;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import lombok.Data;
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 5e2143a39a..a3b1773ff8 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -1311,8 +1311,11 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
 
     public String getModelSql(String modelId, String project) {
         aclEvaluate.checkProjectReadPermission(project);
-        NDataModel model = getManager(NDataModelManager.class, 
project).getDataModelDesc(modelId);
-        return PushDownUtil.generateFlatTableSql(model, false);
+        try (QueryContext queryContext = QueryContext.current()) {
+            queryContext.setAclInfo(AclPermissionUtil.createAclInfo(project, 
getCurrentUserGroups()));
+            NDataModel model = getManager(NDataModelManager.class, 
project).getDataModelDesc(modelId);
+            return PushDownUtil.generateFlatTableSql(model, false);
+        }
     }
 
     public List<RelatedModelResponse> getRelateModels(String project, String 
table, String modelId) {
@@ -1751,9 +1754,10 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
             return;
         }
 
-        try {
+        try (QueryContext queryContext = QueryContext.current()) {
             String project = model.getProject();
             ProjectInstance prjInstance = 
getManager(NProjectManager.class).getProject(project);
+            queryContext.setAclInfo(AclPermissionUtil.createAclInfo(project, 
getCurrentUserGroups()));
             if (prjInstance.getSourceType() == ISourceAware.ID_SPARK
                     && model.getModelType() == NDataModel.ModelType.BATCH) {
                 SparkSession ss = SparderEnv.getSparkSession();
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/BasicSegmentParams.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/BasicSegmentParams.java
index 5190456985..ee4eef9f21 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/BasicSegmentParams.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/params/BasicSegmentParams.java
@@ -20,11 +20,10 @@ package org.apache.kylin.rest.service.params;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.rest.aspect.TransactionProjectUnit;
 
-import com.clearspring.analytics.util.Lists;
-
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
index fb4996ff17..a6112e47fa 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
@@ -18,118 +18,51 @@
 package org.apache.kylin.query.security;
 
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
-import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.util.SqlBasicVisitor;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringHelper;
 import org.apache.kylin.metadata.acl.AclTCR;
 import org.apache.kylin.metadata.acl.AclTCRManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.query.IQueryTransformer;
-import org.apache.kylin.query.exception.NoAuthorizedColsError;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.source.adhocquery.IPushDownConverter;
 
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import lombok.Getter;
 
 public class HackSelectStarWithColumnACL implements IQueryTransformer, 
IPushDownConverter {
 
-    private static final String SELECT_STAR = "*";
-
-    static String getNewSelectClause(SqlNode sqlNode, String project, String 
defaultSchema,
-            QueryContext.AclInfo aclInfo) {
-        StringBuilder newSelectClause = new StringBuilder();
-        List<String> allCols = getColsCanAccess(sqlNode, project, 
defaultSchema, aclInfo);
-        if (CollectionUtils.isEmpty(allCols)) {
-            throw new NoAuthorizedColsError();
-        }
-        for (String col : allCols) {
-            if (!col.equals(allCols.get(allCols.size() - 1))) {
-                newSelectClause.append(col).append(", ");
-            } else {
-                newSelectClause.append(col);
-            }
-        }
-        return newSelectClause.toString();
-    }
-
-    static List<String> getColsCanAccess(SqlNode sqlNode, String project, 
String defaultSchema,
-            QueryContext.AclInfo aclInfo) {
-        List<String> cols = new ArrayList<>();
-        String user = Objects.nonNull(aclInfo) ? aclInfo.getUsername() : null;
-        Set<String> groups = Objects.nonNull(aclInfo) ? aclInfo.getGroups() : 
null;
-        final List<AclTCR> aclTCRs = 
AclTCRManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
-                .getAclTCRs(user, groups);
-        List<RowFilter.Table> tblWithAlias = 
RowFilter.getTblWithAlias(defaultSchema, getSingleSelect(sqlNode));
-        for (RowFilter.Table table : tblWithAlias) {
-            TableDesc tableDesc = 
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
-                    .getTableDesc(table.getName());
-            if (Objects.isNull(tableDesc)) {
-                throw new IllegalStateException(
-                        "Table " + table.getAlias() + " not found. Please add 
table " + table.getAlias()
-                                + " to data source. If this table does exist, 
mention it as DATABASE.TABLE.");
-            }
-
-            List<ColumnDesc> columns = 
Lists.newArrayList(tableDesc.getColumns());
-            Collections.sort(columns, 
Comparator.comparing(ColumnDesc::getZeroBasedIndex));
-            String quotingChar = 
Quoting.valueOf(KylinConfig.getInstanceFromEnv().getCalciteQuoting()).string;
-            for (ColumnDesc column : columns) {
-                if (aclTCRs.stream()
-                        .anyMatch(aclTCR -> 
aclTCR.isAuthorized(tableDesc.getIdentity(), column.getName()))) {
-                    StringBuilder sb = new StringBuilder();
-                    
sb.append(quotingChar).append(table.getAlias()).append(quotingChar) //
-                            .append('.') //
-                            
.append(quotingChar).append(column.getName()).append(quotingChar);
-                    cols.add(sb.toString());
-                }
-            }
-        }
-        return cols;
-    }
-
-    private static boolean isSingleSelectStar(SqlNode sqlNode) {
-        if (SelectNumVisitor.getSelectNum(sqlNode) != 1 || sqlNode instanceof 
SqlExplain) {
-            return false;
-        }
-        SqlSelect singleSelect = getSingleSelect(sqlNode);
-        return singleSelect.getSelectList().toString().equals(SELECT_STAR);
-    }
-
-    private static int getSelectStarPos(String sql, SqlNode sqlNode) {
-        SqlSelect singleSelect = getSingleSelect(sqlNode);
-        Pair<Integer, Integer> replacePos = 
CalciteParser.getReplacePos(singleSelect.getSelectList(), sql);
-        Preconditions.checkState(replacePos.getSecond() - 
replacePos.getFirst() == 1);
-        return replacePos.getFirst();
-    }
-
-    private static SqlSelect getSingleSelect(SqlNode sqlNode) {
-        if (sqlNode instanceof SqlOrderBy) {
-            SqlOrderBy orderBy = (SqlOrderBy) sqlNode;
-            return (SqlSelect) orderBy.query;
-        } else {
-            return (SqlSelect) sqlNode;
-        }
-    }
-
     private static boolean hasAdminPermission(QueryContext.AclInfo aclInfo) {
         if (Objects.isNull(aclInfo) || Objects.isNull(aclInfo.getGroups())) {
             return false;
@@ -153,49 +86,161 @@ public class HackSelectStarWithColumnACL implements 
IQueryTransformer, IPushDown
         try {
             sqlNode = CalciteParser.parse(sql, project);
         } catch (SqlParseException e) {
-            throw new KylinRuntimeException("Failed to parse SQL \'" + sql + 
"\', please make sure the SQL is valid");
+            throw new KylinRuntimeException("Failed to parse invalid SQL: " + 
sql);
         }
 
-        if (!isSingleSelectStar(sqlNode)) {
+        SelectStarAuthVisitor replacer = new SelectStarAuthVisitor(project, 
defaultSchema, aclLocal);
+        sqlNode.accept(replacer);
+        Map<SqlNode, String> resolved = replacer.getResolved();
+        if (resolved.isEmpty()) {
             return sql;
         }
-
-        String newSelectClause = getNewSelectClause(sqlNode, project, 
defaultSchema, aclLocal);
-        int selectStarPos = getSelectStarPos(sql, sqlNode);
-        StringBuilder result = new StringBuilder(sql);
-        result.replace(selectStarPos, selectStarPos + 1, newSelectClause);
-        return result.toString();
+        StringBuilder sb = new StringBuilder();
+        AtomicInteger offset = new AtomicInteger(0);
+        resolved.forEach((node, replaced) -> {
+            Pair<Integer, Integer> replacePos = 
CalciteParser.getReplacePos(node, sql);
+            sb.append(sql, offset.get(), replacePos.getKey());
+            sb.append(replaced);
+            offset.set(replacePos.getValue());
+        });
+        sb.append(sql.substring(offset.get()));
+        return sb.toString();
     }
 
-    static class SelectNumVisitor extends SqlBasicVisitor<SqlNode> {
-        int selectNum = 0;
+    static class SelectStarAuthVisitor extends SqlBasicVisitor<SqlNode> {
+        @Getter
+        Map<SqlNode, String> resolved = new LinkedHashMap<>();
+        /** A cache to avoid the same table generate the replaced sql twice. */
+        private final Map<String, String> tableToReplacedSubQuery = new 
HashMap<>();
+        private final Set<String> namesOfWithItems = new HashSet<>();
+        private final String defaultSchema;
+        private final List<AclTCR> aclTCRList;
+        private final NTableMetadataManager tableMgr;
+
+        SelectStarAuthVisitor(String project, String defaultSchema, 
QueryContext.AclInfo aclInfo) {
+            this.defaultSchema = defaultSchema;
+            KylinConfig config = NProjectManager.getProjectConfig(project);
+            this.tableMgr = NTableMetadataManager.getInstance(config, project);
+            // init aclTCR
+            String user = Objects.nonNull(aclInfo) ? aclInfo.getUsername() : 
null;
+            Set<String> groups = Objects.nonNull(aclInfo) ? 
aclInfo.getGroups() : null;
+            aclTCRList = AclTCRManager.getInstance(config, 
project).getAclTCRs(user, groups);
+        }
 
-        static int getSelectNum(SqlNode sqlNode) {
-            SelectNumVisitor snv = new SelectNumVisitor();
-            sqlNode.accept(snv);
-            return snv.getNum();
+        @Override
+        public SqlNode visit(SqlNodeList nodeList) {
+            for (SqlNode node : nodeList) {
+                if (node instanceof SqlWithItem) {
+                    SqlWithItem item = (SqlWithItem) node;
+                    item.query.accept(this);
+                    namesOfWithItems.add(item.name.toString());
+                }
+            }
+            return null;
         }
 
         @Override
         public SqlNode visit(SqlCall call) {
             if (call instanceof SqlSelect) {
-                selectNum++;
+                SqlSelect select = (SqlSelect) call;
+                markCall(select.getFrom());
+            }
+            if (call instanceof SqlBasicCall) {
+                if (isCallWithAlias(call)) {
+                    markCall(call);
+                } else {
+                    SqlBasicCall basicCall = (SqlBasicCall) call;
+                    for (SqlNode node : basicCall.getOperands()) {
+                        markCall(node);
+                    }
+                }
+            }
+
+            if (call instanceof SqlJoin) {
+                markCall(((SqlJoin) call).getLeft());
+                markCall(((SqlJoin) call).getRight());
+            }
+            if (call instanceof SqlWith) {
+                SqlWith sqlWith = (SqlWith) call;
+                sqlWith.withList.accept(this);
+                sqlWith.body.accept(this);
             }
             if (call instanceof SqlOrderBy) {
-                SqlOrderBy sqlOrderBy = (SqlOrderBy) call;
-                sqlOrderBy.query.accept(this);
+                
call.getOperandList().stream().filter(Objects::nonNull).forEach(node -> 
node.accept(this));
+            }
+            return null;
+        }
+
+        private void markCall(SqlNode operand) {
+            if (operand instanceof SqlIdentifier) {
+                String replaced = markTableIdentifier((SqlIdentifier) operand, 
null);
+                resolved.put(operand, replaced);
+                return;
+            } else if (isCallWithAlias(operand)) {
+                SqlNode[] operands = ((SqlBasicCall) operand).getOperands();
+                SqlNode tableNode = operands[0];
+                if (tableNode instanceof SqlIdentifier) {
+                    String replaced = markTableIdentifier((SqlIdentifier) 
tableNode, operands[1]);
+                    resolved.put(operand, replaced);
+                } else {
+                    tableNode.accept(this);
+                }
+                return;
+            }
+
+            // a sub-query, continue to mark
+            operand.accept(this);
+        }
+
+        private boolean isCallWithAlias(SqlNode from) {
+            return from instanceof SqlBasicCall && from.getKind() == 
SqlKind.AS;
+        }
+
+        private String markTableIdentifier(SqlIdentifier operand, SqlNode 
alias) {
+            if (namesOfWithItems.contains(operand.toString())) {
+                return operand.toString();
+            }
+            List<String> names = operand.names;
+            String schema = names.size() == 1 ? defaultSchema : names.get(0);
+            String table = names.size() == 1 ? names.get(0) : names.get(1);
+            TableDesc tableDesc = tableMgr.getTableDesc(schema + '.' + table);
+            if (tableDesc == null) {
+                throw new KylinRuntimeException("Failed to parse table: " + 
operand);
+            }
+
+            String tableIdentity = tableDesc.getDoubleQuoteIdentity();
+            if (tableToReplacedSubQuery.containsKey(tableIdentity)) {
+                return tableToReplacedSubQuery.get(tableIdentity);
             } else {
-                for (SqlNode operand : call.getOperandList()) {
-                    if (operand != null) {
-                        operand.accept(this);
+                List<String> authorizedCols = getAuthorizedCols(tableDesc);
+                String subQueryAlias = alias == null ? 
StringHelper.doubleQuote(table)
+                        : StringHelper.doubleQuote(alias.toString());
+                String replacedSubQuery = "( select " + String.join(", ", 
authorizedCols) //
+                        + " from " + tableIdentity + ") as " + subQueryAlias;
+                tableToReplacedSubQuery.put(tableIdentity, replacedSubQuery);
+                return replacedSubQuery;
+            }
+        }
+
+        List<String> getAuthorizedCols(TableDesc tableDesc) {
+            List<String> colList = new ArrayList<>();
+            List<ColumnDesc> columns = Arrays.stream(tableDesc.getColumns()) //
+                    
.sorted(Comparator.comparing(ColumnDesc::getZeroBasedIndex)) //
+                    .collect(Collectors.toList());
+            for (ColumnDesc column : columns) {
+                for (AclTCR aclTCR : aclTCRList) {
+                    if (aclTCR.isAuthorized(tableDesc.getIdentity(), 
column.getName())) {
+                        colList.add(getQuotedColName(column));
+                        break;
                     }
                 }
             }
-            return null;
+            return colList;
         }
 
-        private int getNum() {
-            return selectNum;
+        String getQuotedColName(ColumnDesc column) {
+            return StringHelper.doubleQuote(column.getTable().getName()) + "."
+                    + StringHelper.doubleQuote(column.getName());
         }
     }
 }
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java 
b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index b2aeac6995..a4d79d7d8f 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.StringJoiner;
 import java.util.concurrent.ExecutionException;
@@ -58,6 +59,9 @@ import 
org.apache.kylin.guava30.shaded.common.base.Preconditions;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.metadata.acl.AclTCR;
+import org.apache.kylin.metadata.acl.AclTCRManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.JoinDesc;
@@ -87,7 +91,7 @@ public class PushDownUtil {
 
     private static final Logger logger = LoggerFactory.getLogger("query");
 
-    // sql hint "/*+ MODEL_PRIORITY({cube_name}) */" 
+    // sql hint "/*+ MODEL_PRIORITY({cube_name}) */"
     private static final Pattern SQL_HINT_PATTERN = Pattern
             
.compile("/\\*\\s*\\+\\s*(?i)MODEL_PRIORITY\\s*\\([\\s\\S]*\\)\\s*\\*/");
     public static final String DEFAULT_SCHEMA = "DEFAULT";
@@ -244,7 +248,9 @@ public class PushDownUtil {
         StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("SELECT ").append(sep);
 
-        List<TblColRef> tblColRefs = 
Lists.newArrayList(model.getEffectiveCols().values());
+        List<TblColRef> originTblColRefs = 
Lists.newArrayList(model.getEffectiveCols().values());
+        List<TblColRef> tblColRefs = getAuthorizedCols(model.getProject(), 
originTblColRefs);
+
         if (tblColRefs.isEmpty()) {
             sqlBuilder.append("1 ").append(sep);
         } else {
@@ -272,6 +278,28 @@ public class PushDownUtil {
         return new EscapeTransformer().transform(sqlBuilder.toString());
     }
 
+    private static List<TblColRef> getAuthorizedCols(String project, 
List<TblColRef> tableColRefs) {
+        QueryContext.AclInfo aclInfo = QueryContext.current().getAclInfo();
+        if (aclInfo != null && aclInfo.isHasAdminPermission()) {
+            return tableColRefs;
+        }
+        String user = Objects.nonNull(aclInfo) ? aclInfo.getUsername() : null;
+        Set<String> groups = Objects.nonNull(aclInfo) ? aclInfo.getGroups() : 
null;
+        KylinConfig config = NProjectManager.getProjectConfig(project);
+        List<AclTCR> aclTCRList = AclTCRManager.getInstance(config, 
project).getAclTCRs(user, groups);
+        List<TblColRef> result = Lists.newArrayList();
+        for (TblColRef tableColRef : tableColRefs) {
+            ColumnDesc column = tableColRef.getColumnDesc();
+            for (AclTCR aclTCR : aclTCRList) {
+                if (aclTCR.isAuthorized(tableColRef.getTableWithSchema(), 
column.getName())) {
+                    result.add(tableColRef);
+                    break;
+                }
+            }
+        }
+        return result;
+    }
+
     public static String expandComputedColumnExp(NDataModel model, String 
project, String expression) {
         StringBuilder forCC = new StringBuilder();
         forCC.append("select ").append(expression).append(" 
,").append(CC_SPLITTER) //
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/security/HackSelectStarWithColumnACLTest.java
 
b/src/query/src/test/java/org/apache/kylin/query/security/HackSelectStarWithColumnACLTest.java
index 5fe5a9d491..4ed0a797d1 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/security/HackSelectStarWithColumnACLTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/security/HackSelectStarWithColumnACLTest.java
@@ -18,61 +18,281 @@
 
 package org.apache.kylin.query.security;
 
-import java.util.ArrayList;
+import static org.apache.kylin.common.util.TestUtils.getTestConfig;
+
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.junit.annotation.MetadataInfo;
 import org.apache.kylin.metadata.acl.AclTCR;
 import org.apache.kylin.metadata.acl.AclTCRManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.kylin.query.exception.NoAuthorizedColsError;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
-public class HackSelectStarWithColumnACLTest extends 
NLocalFileMetadataTestCase {
-    private final static String PROJECT = "default";
-    private final static String SCHEMA = "DEFAULT";
+@MetadataInfo
+class HackSelectStarWithColumnACLTest {
+    private static final String PROJECT = "default";
+    private static final String SCHEMA = "DEFAULT";
+    private static final HackSelectStarWithColumnACL TRANSFORMER = new 
HackSelectStarWithColumnACL();
+    QueryContext current = QueryContext.current();
 
-    @Before
-    public void setup() throws Exception {
-        createTestMetadata();
+    @BeforeEach
+    void setup() {
         getTestConfig().setProperty("kylin.query.security.acl-tcr-enabled", 
"true");
+        prepareBasic();
+        current.setAclInfo(new QueryContext.AclInfo("u1", 
Sets.newHashSet("g1"), false));
     }
 
-    @After
-    public void after() {
-        cleanupTestMetadata();
+    @AfterAll
+    static void afterAll() {
+        QueryContext.current().close();
     }
 
     @Test
-    public void testTransform() {
-        prepareBasic();
-        HackSelectStarWithColumnACL transformer = new 
HackSelectStarWithColumnACL();
-        QueryContext.current().setAclInfo(new QueryContext.AclInfo("u1", 
Sets.newHashSet("g1"), false));
-        String sql = transformer.convert(
-                "select * from TEST_KYLIN_FACT t1 join TEST_ORDER t2 on 
t1.ORDER_ID = t2.ORDER_ID", PROJECT, SCHEMA);
-        String expectSQL = "select \"T1\".\"PRICE\", \"T1\".\"ITEM_COUNT\", 
\"T1\".\"ORDER_ID\", "
-                + "\"T2\".\"ORDER_ID\", \"T2\".\"BUYER_ID\", 
\"T2\".\"TEST_DATE_ENC\" "
-                + "from TEST_KYLIN_FACT t1 join TEST_ORDER t2 on t1.ORDER_ID = 
t2.ORDER_ID";
-        assertRoughlyEquals(expectSQL, sql);
+    void testJoin() {
+        // without alias
+        {
+            String sql = "select * from TEST_KYLIN_FACT join TEST_ORDER "
+                    + "on TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "select * from ( " //
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\" "
+                    + "join ( select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\" "
+                    + "on TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            Assertions.assertEquals(expected, converted);
+        }
+        // with alias
+        {
+            String sql = "select * from TEST_KYLIN_FACT t1 join TEST_ORDER t2 
on t1.ORDER_ID = t2.ORDER_ID";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "select * from ( " //
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" from 
\"DEFAULT\".\"TEST_KYLIN_FACT\") as \"T1\" "
+                    + "join ( select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"T2\" "
+                    + "on t1.ORDER_ID = t2.ORDER_ID";
+            Assertions.assertEquals(expected, converted);
+        }
+        // nested select star
+        {
+            String sql = "select * from (select * from TEST_KYLIN_FACT) t1 
join TEST_ORDER t2 "
+                    + "on t1.ORDER_ID = t2.ORDER_ID";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "select * from (" //
+                    + "select * from ( " //
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\""
+                    + ") t1 join ( select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\""
+                    + ") as \"T2\" on t1.ORDER_ID = t2.ORDER_ID";
+            Assertions.assertEquals(expected, converted);
+        }
     }
 
     @Test
-    public void testTransformColumnStartWithNumberOrKeyword() {
-        
getTestConfig().setProperty("kylin.query.calcite.extras-props.quoting", 
"DOUBLE_QUOTE");
-        prepareBasic();
+    void testWithSubQuery() {
+        // simple case
+        {
+            String sql = "with test_order as (select * from test_order)\n"
+                    + "select * from TEST_KYLIN_FACT join TEST_ORDER "
+                    + "on TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "with test_order as (select * from ( "
+                    + "select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\")\n"
+                    + "select * from ( " //
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\" "
+                    + "join TEST_ORDER on TEST_KYLIN_FACT.ORDER_ID = 
TEST_ORDER.ORDER_ID";
+            Assertions.assertEquals(expected, converted);
+        }
+        // some content of with-body reuse with-items
+        {
+            String sql = "with test_order as (select * from test_order)\n"
+                    + "select * from TEST_KYLIN_FACT join (select * from 
TEST_ORDER) TEST_ORDER "
+                    + "on TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "with test_order as (select * from ( "
+                    + "select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\")\n"
+                    + "select * from ( " //
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\" "
+                    + "join (select * from TEST_ORDER) TEST_ORDER on 
TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            Assertions.assertEquals(expected, converted);
+        }
+        // all contexts of with-body do not reuse any with-items
+        {
+            String sql = "with test_order as (select * from test_order)\n"
+                    + "select * from TEST_KYLIN_FACT join (select * from 
\"DEFAULT\".\"TEST_ORDER\") TEST_ORDER "
+                    + "on TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "with test_order as (select * from ( "
+                    + "select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\")\n"
+                    + "select * from ( " //
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\" " //
+                    + "join (select * from ( " //
+                    + "select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" "
+                    + "from \"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\") 
TEST_ORDER "
+                    + "on TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            Assertions.assertEquals(expected, converted);
+        }
+        // all contexts of with-body reuse with-items
+        {
+            String sql = "with test_order as (select * from test_order), "
+                    + "test_kylin_fact as (select * from test_kylin_fact)\n"
+                    + "select * from TEST_KYLIN_FACT join (select * from 
TEST_ORDER) TEST_ORDER "
+                    + "on TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "with test_order as (" //
+                    + "select * from ( select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" " //
+                    + "from \"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\"), " 
//
+                    + "test_kylin_fact as ("
+                    + "select * from ( select 
\"TEST_KYLIN_FACT\".\"ORDER_ID\", \"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\")\n"
+                    + "select * from TEST_KYLIN_FACT join (select * from 
TEST_ORDER) TEST_ORDER "
+                    + "on TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID";
+            Assertions.assertEquals(expected, converted);
+        }
+    }
+
+    @Test
+    void testUnion() {
+        // without outer select
+        {
+            String sql = "select * from test_order union select * from 
test_order";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "select * from ( " //
+                    + "select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\" "
+                    + "union select * from ( " //
+                    + "select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\"";
+            Assertions.assertEquals(expected, converted);
+        }
+        // with outer select
+        {
+            String sql = "select * from (select * from test_order union select 
* from test_order)";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            String expected = "select * from (select * from ( " //
+                    + "select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\" "
+                    + "union select * from ( " //
+                    + "select \"TEST_ORDER\".\"ORDER_ID\", 
\"TEST_ORDER\".\"BUYER_ID\", "
+                    + "\"TEST_ORDER\".\"TEST_DATE_ENC\" from 
\"DEFAULT\".\"TEST_ORDER\") as \"TEST_ORDER\")";
+            Assertions.assertEquals(expected, converted);
+        }
+    }
+
+    @Test
+    void testInSubQuery() {
+        String sql = "select * from TEST_KYLIN_FACT "
+                + "where ITEM_COUNT in (select ITEM_COUNT from (select * from 
TEST_KYLIN_FACT) )";
+        String expected = "select * from ( "
+                + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\" "
+                + "where ITEM_COUNT in (select ITEM_COUNT from (select * from 
TEST_KYLIN_FACT) )";
+        String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+        Assertions.assertEquals(expected, converted);
+    }
+
+    @Test
+    void testCaseWhen() {
+        {
+            String sql = "select (case when ITEM_COUNT > 0 " //
+                    + "then (case when order_id > 0 then order_id else 1 end)  
" //
+                    + "else null end)\n" //
+                    + "from TEST_KYLIN_FACT";
+            String expected = "select (case when ITEM_COUNT > 0 " //
+                    + "then (case when order_id > 0 then order_id else 1 end)  
" //
+                    + "else null end)\n" //
+                    + "from ( select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", " //
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" " //
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\"";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            Assertions.assertEquals(expected, converted);
+        }
+
+        {
+            String sql = "select * from test_kylin_fact " //
+                    + "where case when ITEM_COUNT > 10 then item_count else 0 
end";
+            String expected = "select * from ( " //
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", " //
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" " //
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\" "
+                    + "where case when ITEM_COUNT > 10 then item_count else 0 
end";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            Assertions.assertEquals(expected, converted);
+        }
+    }
+
+    @Test
+    void testSingleTable() {
+        // without limit
+        {
+            String sql = "select * from \"DEFAULT\".\"TEST_KYLIN_FACT\"";
+            String expected = "select * from ( "
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\"";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            Assertions.assertEquals(expected, converted);
+        }
+        // with alias
+        {
+            String sql = "select * from test_kylin_fact as test_kylin_fact";
+            String expected = "select * from ( "
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\"";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            Assertions.assertEquals(expected, converted);
+        }
+        // with limit-offset
+        {
+            String sql = "select * from test_kylin_fact as test_kylin_fact 
limit 10 offset 2";
+            String expected = "select * from ( "
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\" limit 10 offset 2";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            Assertions.assertEquals(expected, converted);
+        }
+        // agg
+        {
+            String sql = "select count(*) from 
\"DEFAULT\".\"TEST_KYLIN_FACT\"";
+            String expected = "select count(*) from ( "
+                    + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                    + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\" "
+                    + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\"";
+            String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+            Assertions.assertEquals(expected, converted);
+        }
+    }
+
+    @Test
+    void testKeywordAsColName() {
         prepareMore();
         NTableMetadataManager tableMetadataManager = 
NTableMetadataManager.getInstance(getTestConfig(), PROJECT);
         TableDesc tableDesc = 
tableMetadataManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
@@ -87,28 +307,24 @@ public class HackSelectStarWithColumnACLTest extends 
NLocalFileMetadataTestCase
         colWithKeyword.setDatatype("date");
         colWithKeyword.setName("YEAR");
 
-        ArrayList<ColumnDesc> columnDescs = Lists.newArrayList(columns);
+        List<ColumnDesc> columnDescs = Lists.newArrayList(columns);
         columnDescs.add(colStartsWithNumber);
         columnDescs.add(colWithKeyword);
 
         tableDesc.setColumns(columnDescs.toArray(new ColumnDesc[0]));
         tableMetadataManager.updateTableDesc(tableDesc);
-        HackSelectStarWithColumnACL transformer = new 
HackSelectStarWithColumnACL();
-        QueryContext.current().setAclInfo(new QueryContext.AclInfo("u1", 
Sets.newHashSet("g1"), false));
-        String transformed = transformer.convert("select * from 
TEST_KYLIN_FACT", PROJECT, SCHEMA);
-        String expected = "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", " //
-                + "\"TEST_KYLIN_FACT\".\"PRICE\", " //
-                + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\", " //
-                + "\"TEST_KYLIN_FACT\".\"2D\", " //
-                + "\"TEST_KYLIN_FACT\".\"YEAR\" " //
-                + "from TEST_KYLIN_FACT";
-        Assert.assertEquals(expected, transformed);
+
+        String sql = "select * from TEST_KYLIN_FACT";
+        String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+        String expected = "select * from ( " //
+                + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", 
\"TEST_KYLIN_FACT\".\"PRICE\", "
+                + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\", 
\"TEST_KYLIN_FACT\".\"2D\", \"TEST_KYLIN_FACT\".\"YEAR\" "
+                + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\"";
+        Assertions.assertEquals(expected, converted);
     }
 
     @Test
-    public void testTransformColumnStartWithNumberOrKeyword2() {
-        
getTestConfig().setProperty("kylin.query.calcite.extras-props.quoting", 
"BACK_TICK");
-        prepareBasic();
+    void testColumnNameStartsWithNumber() {
         prepareMore();
         NTableMetadataManager tableMetadataManager = 
NTableMetadataManager.getInstance(getTestConfig(), PROJECT);
         TableDesc tableDesc = 
tableMetadataManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
@@ -123,86 +339,29 @@ public class HackSelectStarWithColumnACLTest extends 
NLocalFileMetadataTestCase
         colWithKeyword.setDatatype("date");
         colWithKeyword.setName("YEAR");
 
-        ArrayList<ColumnDesc> columnDescs = Lists.newArrayList(columns);
+        List<ColumnDesc> columnDescs = Lists.newArrayList(columns);
         columnDescs.add(colStartsWithNumber);
         columnDescs.add(colWithKeyword);
 
         tableDesc.setColumns(columnDescs.toArray(new ColumnDesc[0]));
         tableMetadataManager.updateTableDesc(tableDesc);
-        HackSelectStarWithColumnACL transformer = new 
HackSelectStarWithColumnACL();
-        QueryContext.current().setAclInfo(new QueryContext.AclInfo("u1", 
Sets.newHashSet("g1"), false));
-        String transformed = transformer.convert("select * from 
TEST_KYLIN_FACT", PROJECT, SCHEMA);
-        String expected = "select `TEST_KYLIN_FACT`.`ORDER_ID`, " //
-                + "`TEST_KYLIN_FACT`.`PRICE`, " //
-                + "`TEST_KYLIN_FACT`.`ITEM_COUNT`, " //
-                + "`TEST_KYLIN_FACT`.`2D`, " //
-                + "`TEST_KYLIN_FACT`.`YEAR` " //
-                + "from TEST_KYLIN_FACT";
-        Assert.assertEquals(expected, transformed);
-    }
 
-    @Test
-    public void testExplainSyntax() {
-        HackSelectStarWithColumnACL transformer = new 
HackSelectStarWithColumnACL();
-        String sql = "explain plan for select * from t";
-        assertRoughlyEquals(sql, transformer.convert("explain plan for select 
* from t", PROJECT, SCHEMA));
-    }
-
-    @Test
-    public void testGetNewSelectClause() {
-        prepareBasic();
-        final String sql = "select * from TEST_KYLIN_FACT t1 join TEST_ORDER 
t2 on t1.ORDER_ID = t2.ORDER_ID ";
-        final SqlNode sqlNode = getSqlNode(sql);
-        QueryContext.AclInfo aclInfo = new QueryContext.AclInfo("u1", 
Sets.newHashSet("g1"), false);
-        String newSelectClause = 
HackSelectStarWithColumnACL.getNewSelectClause(sqlNode, PROJECT, SCHEMA, 
aclInfo);
-        String expect = "\"T1\".\"PRICE\", \"T1\".\"ITEM_COUNT\", 
\"T1\".\"ORDER_ID\", "
-                + "\"T2\".\"ORDER_ID\", \"T2\".\"BUYER_ID\", 
\"T2\".\"TEST_DATE_ENC\"";
-        assertRoughlyEquals(expect, newSelectClause);
-
-        AclTCR empty = new AclTCR();
-        empty.setTable(new AclTCR.Table());
-        AclTCRManager manager = AclTCRManager.getInstance(getTestConfig(), 
PROJECT);
-        manager.updateAclTCR(empty, "u1", true);
-        manager.updateAclTCR(empty, "g1", false);
-
-        try {
-            HackSelectStarWithColumnACL.getNewSelectClause(sqlNode, PROJECT, 
SCHEMA, aclInfo);
-        } catch (Exception e) {
-            Assert.assertEquals(NoAuthorizedColsError.class, e.getClass());
-        }
+        String sql = "select * from TEST_KYLIN_FACT";
+        String converted = TRANSFORMER.convert(sql, PROJECT, SCHEMA);
+        String expected = "select * from ( " //
+                + "select \"TEST_KYLIN_FACT\".\"ORDER_ID\", " //
+                + "\"TEST_KYLIN_FACT\".\"PRICE\", " //
+                + "\"TEST_KYLIN_FACT\".\"ITEM_COUNT\", " //
+                + "\"TEST_KYLIN_FACT\".\"2D\", " //
+                + "\"TEST_KYLIN_FACT\".\"YEAR\" " //
+                + "from \"DEFAULT\".\"TEST_KYLIN_FACT\") as 
\"TEST_KYLIN_FACT\"";
+        Assertions.assertEquals(expected, converted);
     }
 
     @Test
-    public void testGetColsCanAccess() {
-
-        final String sql = "select * from TEST_KYLIN_FACT t1 join TEST_ORDER 
t2 on t1.ORDER_ID = t2.ORDER_ID";
-        final SqlNode sqlNode = getSqlNode(sql);
-        QueryContext.AclInfo aclInfo = new QueryContext.AclInfo("u1", 
Sets.newHashSet("g1"), false);
-        List<String> colsCanAccess = 
HackSelectStarWithColumnACL.getColsCanAccess(sqlNode, PROJECT, SCHEMA, aclInfo);
-        Assert.assertEquals(0, colsCanAccess.size());
-
-        prepareBasic();
-        colsCanAccess = HackSelectStarWithColumnACL.getColsCanAccess(sqlNode, 
PROJECT, SCHEMA, aclInfo);
-        Assert.assertEquals(6, colsCanAccess.size());
-    }
-
-    private SqlNode getSqlNode(String sql) {
-        SqlNode sqlNode;
-        try {
-            sqlNode = CalciteParser.parse(sql);
-        } catch (SqlParseException e) {
-            throw new RuntimeException("Failed to parse SQL \'" + sql + "\', 
please make sure the SQL is valid");
-        }
-        return sqlNode;
-    }
-
-    private void assertRoughlyEquals(String expect, String actual) {
-        String[] expectSplit = expect.split("\\s+");
-        String[] actualSplit = actual.split("\\s+");
-        Arrays.sort(expectSplit);
-        Arrays.sort(actualSplit);
-
-        Assert.assertArrayEquals(expectSplit, actualSplit);
+    void testExplainSyntax() {
+        String sql = "explain plan for select * from t";
+        Assertions.assertEquals(sql, TRANSFORMER.convert("explain plan for 
select * from t", PROJECT, SCHEMA));
     }
 
     private void prepareMore() {
@@ -241,7 +400,7 @@ public class HackSelectStarWithColumnACLTest extends 
NLocalFileMetadataTestCase
         AclTCR.Table g1t1 = new AclTCR.Table();
         AclTCR.ColumnRow g1cr1 = new AclTCR.ColumnRow();
         AclTCR.Column g1c1 = new AclTCR.Column();
-        g1c1.addAll(Arrays.asList("ORDER_ID"));
+        g1c1.add("ORDER_ID");
         g1cr1.setColumn(g1c1);
         g1t1.put("DEFAULT.TEST_KYLIN_FACT", g1cr1);
         g1a1.setTable(g1t1);
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java 
b/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java
index 5b91a8cef2..669fdde19c 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/PushDownUtilTest.java
@@ -20,12 +20,17 @@ package org.apache.kylin.query.util;
 
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.QueryErrorCode;
 import org.apache.kylin.common.exception.ServerErrorCode;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.common.util.TestUtils;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.acl.AclTCR;
+import org.apache.kylin.metadata.acl.AclTCRManager;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
@@ -34,65 +39,86 @@ import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.util.AclPermissionUtil;
+import org.apache.kylin.util.MetadataTestUtils;
+import org.glassfish.jersey.internal.guava.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.context.SecurityContextHolder;
 
-public class PushDownUtilTest extends NLocalFileMetadataTestCase {
+@MetadataInfo
+class PushDownUtilTest {
 
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
+    private static void setAdminAuthentication() {
+        TestingAuthenticationToken auth = new 
TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+        SecurityContextHolder.getContext().setAuthentication(auth);
     }
 
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
+    private static void setNormalAuthenticationAndAclTcr() {
+        TestingAuthenticationToken auth = new 
TestingAuthenticationToken("KYLIN", "KYLIN", Constant.ROLE_MODELER);
+        SecurityContextHolder.getContext().setAuthentication(auth);
+        AclTCRManager manager = 
AclTCRManager.getInstance(TestUtils.getTestConfig(), "default");
+        manager.updateAclTCR(new AclTCR(), "KYLIN", true);
+    }
+
+    private static QueryContext.AclInfo prepareRoleModelerAclInfo(String 
project) {
+        Set<String> groups = Sets.newHashSet();
+        groups.add(Constant.ROLE_MODELER);
+        return AclPermissionUtil.createAclInfo(project, groups);
+    }
+
+    private static QueryContext.AclInfo prepareRoleAdminAclInfo(String 
project) {
+        Set<String> groups = Sets.newHashSet();
+        groups.add("admin");
+        groups.add(Constant.ROLE_ADMIN);
+        return AclPermissionUtil.createAclInfo(project, groups);
     }
 
     @Test
-    public void testTryForcePushDown() {
+    void testTryForcePushDown() {
         try {
             QueryParams queryParams = new QueryParams();
             queryParams.setProject("default");
             queryParams.setSelect(true);
             queryParams.setForcedToPushDown(true);
             PushDownUtil.tryIterQuery(queryParams);
-            Assert.fail();
+            Assertions.fail();
         } catch (Exception e) {
-            Assert.assertTrue(e instanceof KylinException);
-            Assert.assertEquals(ServerErrorCode.SPARK_FAILURE.toErrorCode(), 
((KylinException) e).getErrorCode());
+            Assertions.assertTrue(e instanceof KylinException);
+            
Assertions.assertEquals(ServerErrorCode.SPARK_FAILURE.toErrorCode(), 
((KylinException) e).getErrorCode());
         }
     }
 
     @Test
-    public void testTryWithPushDownDisable() {
+    void testTryWithPushDownDisable() {
         try {
-            overwriteSystemProp("kylin.query.pushdown-enabled", "false");
+            String project = "default";
+            MetadataTestUtils.updateProjectConfig(project, 
"kylin.query.pushdown-enabled", "false");
             QueryParams queryParams = new QueryParams();
             queryParams.setProject("default");
             queryParams.setSelect(true);
             queryParams.setForcedToPushDown(true);
             PushDownUtil.tryIterQuery(queryParams);
-            Assert.fail();
+            Assertions.fail();
         } catch (Exception e) {
-            Assert.assertTrue(e instanceof KylinException);
-            
Assert.assertEquals(QueryErrorCode.INVALID_PARAMETER_PUSH_DOWN.toErrorCode(),
+            Assertions.assertTrue(e instanceof KylinException);
+            
Assertions.assertEquals(QueryErrorCode.INVALID_PARAMETER_PUSH_DOWN.toErrorCode(),
                     ((KylinException) e).getErrorCode());
         }
     }
 
     @Test
-    public void testBacktickQuote() {
+    void testBacktickQuote() {
         String table = "db.table";
-        Assert.assertEquals("`db`.`table`", String.join(".", 
PushDownUtil.backtickQuote(table.split("\\."))));
+        Assertions.assertEquals("`db`.`table`", String.join(".", 
PushDownUtil.backtickQuote(table.split("\\."))));
     }
 
     @Test
-    public void testMassagePushDownSql() {
+    void testMassagePushDownSql() {
         KylinConfig config = KylinConfig.createKylinConfig(new Properties());
-        try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig.setAndUnsetThreadLocalConfig(config)) {
+        try (KylinConfig.SetAndUnsetThreadLocalConfig ignored = 
KylinConfig.setAndUnsetThreadLocalConfig(config)) {
             config.setProperty("kylin.query.pushdown.converter-class-names",
                     SparkSQLFunctionConverter.class.getCanonicalName());
             String sql = "SELECT \"Z_PROVDASH_UM_ED\".\"GENDER\" AS 
\"GENDER\",\n"
@@ -105,25 +131,25 @@ public class PushDownUtilTest extends 
NLocalFileMetadataTestCase {
             String expectedSql = "SELECT `Z_PROVDASH_UM_ED`.`GENDER` AS 
`GENDER`,\n"
                     + "SUM(CAST(0 AS BIGINT)) AS 
`sum_Calculation_336925569152049156_ok`\n"
                     + "FROM `POPHEALTH_ANALYTICS`.`Z_PROVDASH_UM_ED` 
`Z_PROVDASH_UM_ED`";
-            Assert.assertEquals(expectedSql, massagedSql);
+            Assertions.assertEquals(expectedSql, massagedSql);
         }
     }
 
     @Test
-    public void testMassagePushDownSqlWithDoubleQuote() {
+    void testMassagePushDownSqlWithDoubleQuote() {
         KylinConfig config = KylinConfig.createKylinConfig(new Properties());
         String sql = "select '''',trans_id from test_kylin_fact where 
LSTG_FORMAT_NAME like '%''%' group by trans_id limit 2;";
         QueryParams queryParams = new QueryParams("", sql, "default", false);
         queryParams.setKylinConfig(config);
         String massagedSql = PushDownUtil.massagePushDownSql(queryParams);
         String expectedSql = "select '\\'', `TRANS_ID` from `TEST_KYLIN_FACT` 
where `LSTG_FORMAT_NAME` like '%\\'%' group by `TRANS_ID` limit 2";
-        Assert.assertEquals(expectedSql, massagedSql);
+        Assertions.assertEquals(expectedSql, massagedSql);
     }
 
     @Test
-    public void testMassagePushDownSqlWithDialectConverter() {
+    void testMassagePushDownSqlWithDialectConverter() {
         KylinConfig config = KylinConfig.createKylinConfig(new Properties());
-        try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig.setAndUnsetThreadLocalConfig(config)) {
+        try (KylinConfig.SetAndUnsetThreadLocalConfig ignored = 
KylinConfig.setAndUnsetThreadLocalConfig(config)) {
             config.setProperty("kylin.query.pushdown.converter-class-names",
                     
"org.apache.kylin.query.util.DialectConverter,org.apache.kylin.source.adhocquery.DoubleQuotePushDownConverter,"
                             + 
SparkSQLFunctionConverter.class.getCanonicalName());
@@ -138,141 +164,182 @@ public class PushDownUtilTest extends 
NLocalFileMetadataTestCase {
             String expectedSql = "SELECT `Z_PROVDASH_UM_ED`.`GENDER` AS 
`GENDER`, "
                     + "SUM(CAST(0 AS BIGINT)) AS 
`sum_Calculation_336925569152049156_ok`\n"
                     + "FROM `POPHEALTH_ANALYTICS`.`Z_PROVDASH_UM_ED` AS 
`Z_PROVDASH_UM_ED`\n" + "LIMIT 1";
-            Assert.assertEquals(expectedSql, massagedSql);
+            Assertions.assertEquals(expectedSql, massagedSql);
         }
     }
 
     @Test
-    public void testReplaceDoubleQuoteToSingle() {
+    void testReplaceDoubleQuoteToSingle() {
         String sql = "select ab from table where aa = '' and bb = '''as''n'''";
         String resSql = "select ab from table where aa = '' and bb = 
'\\'as\\'n\\''";
-        Assert.assertEquals(resSql, PushDownUtil.replaceEscapedQuote(sql));
+        Assertions.assertEquals(resSql, PushDownUtil.replaceEscapedQuote(sql));
     }
 
     @Test
-    public void testGenerateFlatTableSql() {
-        String project = "default";
-        NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        NDataModel model = modelManager.getDataModelDescByAlias("test_bank");
-        String expected = "SELECT\n" //
-                + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
-                + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
-                + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
-                + ", \"TEST_BANK_INCOME\".\"DT\" as \"TEST_BANK_INCOME_DT\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
-                + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
-                + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
-                + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n" //
-                + "WHERE\n" //
-                + "1 = 1";
-        Assert.assertEquals(expected, PushDownUtil.generateFlatTableSql(model, 
false));
+    void testGenerateFlatTableSql() {
+        try (QueryContext context = QueryContext.current()) {
+            String project = "default";
+            setNormalAuthenticationAndAclTcr();
+            context.setAclInfo(prepareRoleModelerAclInfo(project));
+            NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+            NDataModel model = 
modelManager.getDataModelDescByAlias("test_bank");
+            String expected = "SELECT\n" //
+                    + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"DT\" as 
\"TEST_BANK_INCOME_DT\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
+                    + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
+                    + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
+                    + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n" //
+                    + "WHERE\n" //
+                    + "1 = 1";
+            Assertions.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(model, false));
+        }
     }
 
     @Test
-    public void testGenerateFlatTableSqlWithCCJoin() {
-        String project = "default";
-        NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        NDataModel model = modelManager.getDataModelDescByAlias("test_bank");
-        updateModelToAddCC(project, model);
-        // change join condition
-        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            NDataModelManager modelMgr = 
NDataModelManager.getInstance(kylinConfig, project);
-            modelMgr.updateDataModel(model.getUuid(), copyForWrite -> {
-                List<JoinTableDesc> joinTables = copyForWrite.getJoinTables();
-                TableRef rootTableRef = copyForWrite.getRootFactTable();
-                TblColRef cc1 = rootTableRef.getColumn("CC1");
-                JoinDesc join = joinTables.get(0).getJoin();
-                join.setForeignKeyColumns(new TblColRef[] { cc1 });
-                join.setForeignKey(new String[] { "TEST_BANK_INCOME.CC1" });
-            });
-            return null;
-        }, project);
-        String expected = "SELECT\n" //
-                + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
-                + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
-                + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
-                + ", \"TEST_BANK_INCOME\".\"DT\" as \"TEST_BANK_INCOME_DT\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
-                + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
-                + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
-                + "ON SUBSTRING(\"TEST_BANK_INCOME\".\"COUNTRY\", 0, 4) = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n"
-                + "WHERE\n" //
-                + "1 = 1";
-        NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
-        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
+    void testGenerateFlatTableSqlOfRoleModeler() {
+        try (QueryContext context = QueryContext.current()) {
+            String project = "default";
+            setAdminAuthentication();
+            context.setAclInfo(prepareRoleAdminAclInfo(project));
+            NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+            NDataModel model = 
modelManager.getDataModelDescByAlias("test_bank");
+            String expected = "SELECT\n" //
+                    + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"DT\" as 
\"TEST_BANK_INCOME_DT\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
+                    + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
+                    + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
+                    + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n" //
+                    + "WHERE\n" //
+                    + "1 = 1";
+            Assertions.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(model, false));
+        }
+    }
 
+    @Test
+    void testGenerateFlatTableSqlWithCCJoin() {
+        try (QueryContext context = QueryContext.current()) {
+            String project = "default";
+            setNormalAuthenticationAndAclTcr();
+            context.setAclInfo(prepareRoleModelerAclInfo(project));
+            NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+            NDataModel model = 
modelManager.getDataModelDescByAlias("test_bank");
+            updateModelToAddCC(project, model);
+            // change join condition
+            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+                KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                NDataModelManager modelMgr = 
NDataModelManager.getInstance(kylinConfig, project);
+                modelMgr.updateDataModel(model.getUuid(), copyForWrite -> {
+                    List<JoinTableDesc> joinTables = 
copyForWrite.getJoinTables();
+                    TableRef rootTableRef = copyForWrite.getRootFactTable();
+                    TblColRef cc1 = rootTableRef.getColumn("CC1");
+                    JoinDesc join = joinTables.get(0).getJoin();
+                    join.setForeignKeyColumns(new TblColRef[] { cc1 });
+                    join.setForeignKey(new String[] { "TEST_BANK_INCOME.CC1" 
});
+                });
+                return null;
+            }, project);
+            String expected = "SELECT\n" //
+                    + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"DT\" as 
\"TEST_BANK_INCOME_DT\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
+                    + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
+                    + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
+                    + "ON SUBSTRING(\"TEST_BANK_INCOME\".\"COUNTRY\", 0, 4) = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n"
+                    + "WHERE\n" //
+                    + "1 = 1";
+            NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
+            Assertions.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
+        }
     }
 
     @Test
-    public void testGenerateFlatTableSqlWithFilterCondition() {
-        String project = "default";
-        NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        NDataModel model = modelManager.getDataModelDescByAlias("test_bank");
-        updateModelToAddCC(project, model);
-        // change filter condition
-        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            NDataModelManager modelMgr = 
NDataModelManager.getInstance(kylinConfig, project);
-            modelMgr.updateDataModel(model.getUuid(), copyForWrite -> {
-                copyForWrite.setFilterCondition(
-                        "SUBSTRING(\"TEST_BANK_INCOME\".\"COUNTRY\", 0, 4) = 
'china' and cc1 = 'china'");
-            });
-            return null;
-        }, project);
-        String expected = "SELECT\n" //
-                + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
-                + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
-                + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
-                + ", \"TEST_BANK_INCOME\".\"DT\" as \"TEST_BANK_INCOME_DT\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
-                + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
-                + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
-                + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n" //
-                + "WHERE\n" //
-                + "1 = 1\n" //
-                + " AND (SUBSTRING(\"TEST_BANK_INCOME\".\"COUNTRY\", 0, 4) = 
'china' and cc1 = 'china')";
-        NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
-        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
+    void testGenerateFlatTableSqlWithFilterCondition() {
+        try (QueryContext context = QueryContext.current()) {
+            String project = "default";
+            setNormalAuthenticationAndAclTcr();
+            context.setAclInfo(prepareRoleModelerAclInfo(project));
+            NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+            NDataModel model = 
modelManager.getDataModelDescByAlias("test_bank");
+            updateModelToAddCC(project, model);
+            // change filter condition
+            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+                KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                NDataModelManager modelMgr = 
NDataModelManager.getInstance(kylinConfig, project);
+                modelMgr.updateDataModel(model.getUuid(), copyForWrite -> {
+                    copyForWrite.setFilterCondition(
+                            "SUBSTRING(\"TEST_BANK_INCOME\".\"COUNTRY\", 0, 4) 
= 'china' and cc1 = 'china'");
+                });
+                return null;
+            }, project);
+            String expected = "SELECT\n" //
+                    + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"DT\" as 
\"TEST_BANK_INCOME_DT\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
+                    + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
+                    + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
+                    + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n" //
+                    + "WHERE\n" //
+                    + "1 = 1\n" //
+                    + " AND (SUBSTRING(\"TEST_BANK_INCOME\".\"COUNTRY\", 0, 4) 
= 'china' and cc1 = 'china')";
+            NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
+            Assertions.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
+        }
     }
 
     @Test
-    public void testGenerateFlatTableSqlWithSpecialFunctions() {
-        String project = "default";
-        NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        NDataModel model = modelManager.getDataModelDescByAlias("test_bank");
-        updateModelToAddCC(project, model);
-        // change filter condition
-        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            NDataModelManager modelMgr = 
NDataModelManager.getInstance(kylinConfig, project);
-            modelMgr.updateDataModel(model.getUuid(), copyForWrite -> {
-                copyForWrite.setFilterCondition("timestampadd(day, 1, 
current_date) = '2012-01-01' and cc1 = 'china'");
-            });
-            return null;
-        }, project);
-        String expected = "SELECT\n" //
-                + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
-                + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
-                + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
-                + ", \"TEST_BANK_INCOME\".\"DT\" as \"TEST_BANK_INCOME_DT\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
-                + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
-                + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
-                + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
-                + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n" //
-                + "WHERE\n" //
-                + "1 = 1\n" //
-                + " AND (TIMESTAMPADD(day, 1, current_date) = '2012-01-01' and 
cc1 = 'china')";
-        NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
-        Assert.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
+    void testGenerateFlatTableSqlWithSpecialFunctions() {
+        try (QueryContext context = QueryContext.current()) {
+            String project = "default";
+            setNormalAuthenticationAndAclTcr();
+            context.setAclInfo(prepareRoleModelerAclInfo(project));
+            NDataModelManager modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+            NDataModel model = 
modelManager.getDataModelDescByAlias("test_bank");
+            updateModelToAddCC(project, model);
+            // change filter condition
+            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+                KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                NDataModelManager modelMgr = 
NDataModelManager.getInstance(kylinConfig, project);
+                modelMgr.updateDataModel(model.getUuid(), copyForWrite -> {
+                    copyForWrite
+                            .setFilterCondition("timestampadd(day, 1, 
current_date) = '2012-01-01' and cc1 = 'china'");
+                });
+                return null;
+            }, project);
+            String expected = "SELECT\n" //
+                    + "\"TEST_BANK_INCOME\".\"COUNTRY\" as 
\"TEST_BANK_INCOME_COUNTRY\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"INCOME\" as 
\"TEST_BANK_INCOME_INCOME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"NAME\" as 
\"TEST_BANK_INCOME_NAME\"\n"
+                    + ", \"TEST_BANK_INCOME\".\"DT\" as 
\"TEST_BANK_INCOME_DT\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"COUNTRY\" as 
\"TEST_BANK_LOCATION_COUNTRY\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"OWNER\" as 
\"TEST_BANK_LOCATION_OWNER\"\n"
+                    + ", \"TEST_BANK_LOCATION\".\"LOCATION\" as 
\"TEST_BANK_LOCATION_LOCATION\"\n"
+                    + "FROM \"DEFAULT\".\"TEST_BANK_INCOME\" as 
\"TEST_BANK_INCOME\"\n"
+                    + "INNER JOIN \"DEFAULT\".\"TEST_BANK_LOCATION\" as 
\"TEST_BANK_LOCATION\"\n"
+                    + "ON \"TEST_BANK_INCOME\".\"COUNTRY\" = 
\"TEST_BANK_LOCATION\".\"COUNTRY\"\n" //
+                    + "WHERE\n" //
+                    + "1 = 1\n" //
+                    + " AND (TIMESTAMPADD(day, 1, current_date) = '2012-01-01' 
and cc1 = 'china')";
+            NDataModel updatedModel = 
modelManager.getDataModelDesc(model.getUuid());
+            Assertions.assertEquals(expected, 
PushDownUtil.generateFlatTableSql(updatedModel, false));
+        }
     }
 
     private void updateModelToAddCC(String project, NDataModel model) {
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
index 52943ecd2e..cb1e1d3547 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.engine.spark.ExecutableUtils;
 import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.metadata.cube.model.NDataLayout;
@@ -38,8 +39,6 @@ import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
 import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
-import com.clearspring.analytics.util.Lists;
-
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
index 6f3b1cf5ed..12b91f0932 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -45,6 +45,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.guava30.shaded.common.cache.Cache;
 import org.apache.kylin.guava30.shaded.common.cache.CacheBuilder;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
@@ -64,8 +65,6 @@ import org.apache.spark.sql.internal.SQLConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.clearspring.analytics.util.Lists;
-
 import lombok.val;
 
 public class NSparkMetadataExplorer implements ISourceMetadataExplorer, 
ISampleDataDeployer, Serializable {
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkMergingJobTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkMergingJobTest.java
index e41fa4e652..7a09678bfc 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkMergingJobTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkMergingJobTest.java
@@ -18,14 +18,16 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import com.clearspring.analytics.util.Lists;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
-import lombok.val;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.engine.spark.IndexDataConstructor;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -47,9 +49,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
+import lombok.val;
 
 public class NSparkMergingJobTest extends NLocalWithSparkSessionTest {
 


Reply via email to