This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new da33a48f39 [refactor](policy) Refactor the hierarchy of Policy. (#9786)
da33a48f39 is described below
commit da33a48f396f33983502fd74aaea9b5aa6384077
Author: pengxiangyu <[email protected]>
AuthorDate: Sat Jun 4 11:29:09 2022 +0800
[refactor](policy) Refactor the hierarchy of Policy. (#9786)
The RowPolicy extends Policy
---
.../org/apache/doris/common/FeMetaVersion.java | 2 +-
.../apache/doris/analysis/CreatePolicyStmt.java | 4 +-
.../org/apache/doris/analysis/StmtRewriter.java | 4 +-
.../java/org/apache/doris/persist/EditLog.java | 9 +-
.../org/apache/doris/persist/gson/GsonUtils.java | 9 +
.../main/java/org/apache/doris/policy/Policy.java | 111 ++++-----
.../java/org/apache/doris/policy/PolicyMgr.java | 263 ++++++++++-----------
.../org/apache/doris/policy/PolicyTypeEnum.java | 2 +-
.../doris/policy/{Policy.java => RowPolicy.java} | 133 ++++++-----
.../java/org/apache/doris/policy/PolicyTest.java | 42 ++++
10 files changed, 301 insertions(+), 278 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 6158431607..70cacd7a1c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -40,7 +40,7 @@ public final class FeMetaVersion {
public static final int VERSION_109 = 109;
// For routine load user info
public static final int VERSION_110 = 110;
- // NOTE: when increment meta version, should assign the latest version to
VERSION_CURRENT
+ // note: when increment meta version, should assign the latest version to
VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_110;
// all logs meta version should >= the minimum version, so that we could
remove many if clause, for example
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
index b8771575da..3f8c80c9cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
@@ -59,8 +59,8 @@ public class CreatePolicyStmt extends DdlStmt {
/**
* Use for cup.
**/
- public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String
policyName, TableName tableName, String filterType,
- UserIdentity user, Expr wherePredicate) {
+ public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String
policyName, TableName tableName,
+ String filterType, UserIdentity user, Expr
wherePredicate) {
this.type = type;
this.ifNotExists = ifNotExists;
this.policyName = policyName;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
index 1700c64ae8..42a0188a84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
@@ -27,7 +27,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.TableAliasGenerator;
import org.apache.doris.common.UserException;
-import org.apache.doris.policy.Policy;
+import org.apache.doris.policy.RowPolicy;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
@@ -1187,7 +1187,7 @@ public class StmtRewriter {
Database db = currentCatalog.getDbOrAnalysisException(dbName);
long dbId = db.getId();
long tableId = table.getId();
- Policy matchPolicy =
currentCatalog.getPolicyMgr().getMatchRowPolicy(dbId, tableId, user);
+ RowPolicy matchPolicy =
currentCatalog.getPolicyMgr().getMatchTablePolicy(dbId, tableId, user);
if (matchPolicy == null) {
continue;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index ca958128ab..de497deb75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -65,6 +65,7 @@ import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
+import org.apache.doris.policy.RowPolicy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@@ -812,7 +813,7 @@ public class EditLog {
break;
}
case OperationType.OP_CREATE_POLICY: {
- Policy log = (Policy) journal.getData();
+ RowPolicy log = (RowPolicy) journal.getData();
catalog.getPolicyMgr().replayCreate(log);
break;
}
@@ -1425,7 +1426,11 @@ public class EditLog {
}
public void logCreatePolicy(Policy policy) {
- logEdit(OperationType.OP_CREATE_POLICY, policy);
+ if (policy instanceof RowPolicy) {
+ logEdit(OperationType.OP_CREATE_POLICY, policy);
+ } else {
+ LOG.error("invalid policy: " + policy.getType().name());
+ }
}
public void logDropPolicy(DropPolicyLog log) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 7f5c4170c3..08a61c5bb0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -35,6 +35,8 @@ import
org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.canal.CanalSyncJob;
+import org.apache.doris.policy.Policy;
+import org.apache.doris.policy.RowPolicy;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
@@ -130,6 +132,12 @@ public class GsonUtils {
.of(LoadJobStateUpdateInfo.class, "clazz")
.registerSubtype(SparkLoadJobStateUpdateInfo.class,
SparkLoadJobStateUpdateInfo.class.getSimpleName());
+
+ // runtime adapter for class "Policy"
+ private static RuntimeTypeAdapterFactory<Policy> policyTypeAdapterFactory
= RuntimeTypeAdapterFactory
+ .of(Policy.class, "clazz")
+ .registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName());
+
// the builder of GSON instance.
// Add any other adapters if necessary.
private static final GsonBuilder GSON_BUILDER = new GsonBuilder()
@@ -144,6 +152,7 @@ public class GsonUtils {
.registerTypeAdapterFactory(alterJobV2TypeAdapterFactory)
.registerTypeAdapterFactory(syncJobTypeAdapterFactory)
.registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory)
+ .registerTypeAdapterFactory(policyTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new
ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new
AtomicBooleanAdapter());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
index d617dda050..894bc463ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
@@ -18,9 +18,6 @@
package org.apache.doris.policy;
import org.apache.doris.analysis.CreatePolicyStmt;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
@@ -28,69 +25,53 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
-import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
-import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
-import lombok.AllArgsConstructor;
import lombok.Data;
+import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.StringReader;
import java.util.List;
/**
- * Save policy for filtering data.
+ * Base class for Policy.
**/
@Data
-@AllArgsConstructor
-public class Policy implements Writable, GsonPostProcessable {
-
- public static final String ROW_POLICY = "ROW";
+public abstract class Policy implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(Policy.class);
- @SerializedName(value = "dbId")
- private long dbId;
-
- @SerializedName(value = "tableId")
- private long tableId;
-
- @SerializedName(value = "policyName")
- private String policyName;
+ @SerializedName(value = "policyId")
+ protected long policyId = -1;
- /**
- * ROW.
- **/
@SerializedName(value = "type")
- private PolicyTypeEnum type;
+ protected PolicyTypeEnum type = null;
- /**
- * PERMISSIVE | RESTRICTIVE, If multiple types exist, the last type
prevails.
- **/
- @SerializedName(value = "filterType")
- private final FilterType filterType;
-
- private Expr wherePredicate;
+ @SerializedName(value = "policyName")
+ protected String policyName = null;
- /**
- * Policy bind user.
- **/
- @SerializedName(value = "user")
- private final UserIdentity user;
+ public Policy() {
+ policyId = Catalog.getCurrentCatalog().getNextId();
+ }
/**
- * Use for Serialization/deserialization.
- **/
- @SerializedName(value = "originStmt")
- private String originStmt;
+ * Base class for Policy.
+ *
+ * @param type policy type
+ * @param policyName policy name
+ */
+ public Policy(final PolicyTypeEnum type, final String policyName) {
+ policyId = Catalog.getCurrentCatalog().getNextId();
+ this.type = type;
+ this.policyName = policyName;
+ }
/**
* Trans stmt to Policy.
@@ -101,22 +82,22 @@ public class Policy implements Writable,
GsonPostProcessable {
curDb = ConnectContext.get().getDatabase();
}
Database db =
Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
- Table table =
db.getTableOrAnalysisException(stmt.getTableName().getTbl());
UserIdentity userIdent = stmt.getUser();
userIdent.analyze(ConnectContext.get().getClusterName());
- return new Policy(db.getId(), table.getId(), stmt.getPolicyName(),
stmt.getType(), stmt.getFilterType(),
- stmt.getWherePredicate(), userIdent,
stmt.getOrigStmt().originStmt);
+ switch (stmt.getType()) {
+ case ROW:
+ default:
+ Table table =
db.getTableOrAnalysisException(stmt.getTableName().getTbl());
+ return new RowPolicy(stmt.getType(), stmt.getPolicyName(),
db.getId(), userIdent,
+ stmt.getOrigStmt().originStmt, table.getId(),
stmt.getFilterType(),
+ stmt.getWherePredicate());
+ }
}
/**
* Use for SHOW POLICY.
**/
- public List<String> getShowInfo() throws AnalysisException {
- Database database =
Catalog.getCurrentCatalog().getDbOrAnalysisException(this.dbId);
- Table table = database.getTableOrAnalysisException(this.tableId);
- return Lists.newArrayList(this.policyName, database.getFullName(),
table.getName(), this.type.name(),
- this.filterType.name(), this.wherePredicate.toSql(),
this.user.getQualifiedUser(), this.originStmt);
- }
+ public abstract List<String> getShowInfo() throws AnalysisException;
@Override
public void write(DataOutput out) throws IOException {
@@ -124,31 +105,27 @@ public class Policy implements Writable,
GsonPostProcessable {
}
/**
- * Read policy from file.
+ * Read Policy from file.
**/
public static Policy read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, Policy.class);
}
- @Override
- public void gsonPostProcess() throws IOException {
- if (wherePredicate != null) {
- return;
- }
- try {
- SqlScanner input = new SqlScanner(new StringReader(originStmt),
0L);
- SqlParser parser = new SqlParser(input);
- CreatePolicyStmt stmt = (CreatePolicyStmt)
SqlParserUtils.getFirstStmt(parser);
- wherePredicate = stmt.getWherePredicate();
- } catch (Exception e) {
- throw new IOException("policy parse originStmt error", e);
- }
+ protected boolean checkMatched(PolicyTypeEnum type, String policyName) {
+ return (type == null || type.equals(this.type))
+ && (policyName == null || StringUtils.equals(policyName,
this.policyName));
}
- @Override
- public Policy clone() {
- return new Policy(this.dbId, this.tableId, this.policyName, this.type,
this.filterType, this.wherePredicate,
- this.user, this.originStmt);
+ // it is used to check whether this policy is in PolicyMgr
+ public boolean matchPolicy(Policy checkedPolicyCondition) {
+ return checkMatched(checkedPolicyCondition.getType(),
checkedPolicyCondition.getPolicyName());
+ }
+
+ public boolean matchPolicy(DropPolicyLog checkedDropPolicyLogCondition) {
+ return checkMatched(checkedDropPolicyLogCondition.getType(),
checkedDropPolicyLogCondition.getPolicyName());
}
+
+ public abstract boolean isInvalid();
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
index c3a1bebbf8..0746e191b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
@@ -21,7 +21,6 @@ import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.ShowPolicyStmt;
-import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
@@ -37,7 +36,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
-import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -61,14 +59,14 @@ public class PolicyMgr implements Writable {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
- @SerializedName(value = "dbIdToPolicyMap")
- private Map<Long, List<Policy>> dbIdToPolicyMap = Maps.newConcurrentMap();
+ @SerializedName(value = "typeToPolicyMap")
+ private Map<PolicyTypeEnum, List<Policy>> typeToPolicyMap =
Maps.newConcurrentMap();
/**
* Cache merge policy for match.
* key:dbId:tableId-type-user
**/
- private Map<Long, Map<String, Policy>> dbIdToMergePolicyMap =
Maps.newConcurrentMap();
+ private Map<Long, Map<String, RowPolicy>> dbIdToMergeTablePolicyMap =
Maps.newConcurrentMap();
private Set<String> userPolicySet = Sets.newConcurrentHashSet();
@@ -95,8 +93,7 @@ public class PolicyMgr implements Writable {
Policy policy = Policy.fromCreateStmt(stmt);
writeLock();
try {
- if (existPolicy(policy.getDbId(), policy.getTableId(),
policy.getType(),
- policy.getPolicyName(), policy.getUser())) {
+ if (existPolicy(policy)) {
if (stmt.isIfNotExists()) {
return;
}
@@ -113,45 +110,47 @@ public class PolicyMgr implements Writable {
* Drop policy through stmt.
**/
public void dropPolicy(DropPolicyStmt stmt) throws DdlException,
AnalysisException {
- DropPolicyLog policy = DropPolicyLog.fromDropStmt(stmt);
+ DropPolicyLog dropPolicyLog = DropPolicyLog.fromDropStmt(stmt);
writeLock();
try {
- if (!existPolicy(policy.getDbId(), policy.getTableId(),
policy.getType(),
- policy.getPolicyName(), policy.getUser())) {
+ if (!existPolicy(dropPolicyLog)) {
if (stmt.isIfExists()) {
return;
}
- throw new DdlException("the policy " + policy.getPolicyName()
+ " not exist");
+ throw new DdlException("the policy " +
dropPolicyLog.getPolicyName() + " not exist");
}
- unprotectedDrop(policy);
- Catalog.getCurrentCatalog().getEditLog().logDropPolicy(policy);
+ unprotectedDrop(dropPolicyLog);
+
Catalog.getCurrentCatalog().getEditLog().logDropPolicy(dropPolicyLog);
} finally {
writeUnlock();
}
}
+ /**
+ * Check whether this user has policy.
+ *
+ * @param user user who has policy
+ * @return exist or not
+ */
public boolean existPolicy(String user) {
return userPolicySet.contains(user);
}
- private boolean existPolicy(long dbId, long tableId, PolicyTypeEnum type,
String policyName, UserIdentity user) {
- List<Policy> policies = getDbPolicies(dbId);
- return policies.stream().anyMatch(policy -> matchPolicy(policy,
tableId, type, policyName, user));
+ private boolean existPolicy(Policy checkedPolicy) {
+ List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
+ return policies.stream().anyMatch(policy ->
policy.matchPolicy(checkedPolicy));
}
- private List<Policy> getDbPolicies(long dbId) {
- if (dbIdToPolicyMap == null) {
- return new ArrayList<>();
- }
- return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>());
+ private boolean existPolicy(DropPolicyLog checkedDropPolicy) {
+ List<Policy> policies = getPoliciesByType(checkedDropPolicy.getType());
+ return policies.stream().anyMatch(policy ->
policy.matchPolicy(checkedDropPolicy));
}
- private List<Policy> getDbUserPolicies(long dbId, String user) {
- if (dbIdToPolicyMap == null) {
+ private List<Policy> getPoliciesByType(PolicyTypeEnum policyType) {
+ if (typeToPolicyMap == null) {
return new ArrayList<>();
}
- return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>()).stream()
- .filter(p ->
p.getUser().getQualifiedUser().equals(user)).collect(Collectors.toList());
+ return typeToPolicyMap.getOrDefault(policyType, new ArrayList<>());
}
public void replayCreate(Policy policy) {
@@ -163,12 +162,10 @@ public class PolicyMgr implements Writable {
if (policy == null) {
return;
}
- long dbId = policy.getDbId();
- List<Policy> dbPolicies = getDbPolicies(dbId);
+ List<Policy> dbPolicies = getPoliciesByType(policy.getType());
dbPolicies.add(policy);
- dbIdToPolicyMap.put(dbId, dbPolicies);
- updateMergePolicyMap(dbId);
- userPolicySet.add(policy.getUser().getQualifiedUser());
+ typeToPolicyMap.put(policy.getType(), dbPolicies);
+ updateMergeTablePolicyMap();
}
public void replayDrop(DropPolicyLog log) {
@@ -178,42 +175,26 @@ public class PolicyMgr implements Writable {
private void unprotectedDrop(DropPolicyLog log) {
long dbId = log.getDbId();
- List<Policy> policies = getDbPolicies(dbId);
- policies.removeIf(p -> matchPolicy(p, log.getTableId(), log.getType(),
log.getPolicyName(), log.getUser()));
- dbIdToPolicyMap.put(dbId, policies);
- updateMergePolicyMap(dbId);
- if (log.getUser() == null) {
- updateAllUserPolicySet();
- } else {
- String user = log.getUser().getQualifiedUser();
- if (!existUserPolicy(user)) {
- userPolicySet.remove(user);
- }
- }
- }
-
- private boolean matchPolicy(Policy policy, long tableId, PolicyTypeEnum
type,
- String policyName, UserIdentity user) {
- return policy.getTableId() == tableId
- && policy.getType().equals(type)
- && StringUtils.equals(policy.getPolicyName(), policyName)
- && (user == null ||
StringUtils.equals(policy.getUser().getQualifiedUser(),
user.getQualifiedUser()));
+ List<Policy> policies = getPoliciesByType(log.getType());
+ policies.removeIf(policy -> policy.matchPolicy(log));
+ typeToPolicyMap.put(log.getType(), policies);
+ updateMergeTablePolicyMap();
}
/**
* Match row policy and return it.
**/
- public Policy getMatchRowPolicy(long dbId, long tableId, String user) {
+ public RowPolicy getMatchTablePolicy(long dbId, long tableId, String user)
{
readLock();
try {
- if (!dbIdToMergePolicyMap.containsKey(dbId)) {
+ if (!dbIdToMergeTablePolicyMap.containsKey(dbId)) {
return null;
}
- String key = Joiner.on("-").join(tableId, Policy.ROW_POLICY, user);
- if (!dbIdToMergePolicyMap.get(dbId).containsKey(key)) {
+ String key = Joiner.on("-").join(tableId,
PolicyTypeEnum.ROW.name(), user);
+ if (!dbIdToMergeTablePolicyMap.get(dbId).containsKey(key)) {
return null;
}
- return dbIdToMergePolicyMap.get(dbId).get(key);
+ return dbIdToMergeTablePolicyMap.get(dbId).get(key);
} finally {
readUnlock();
}
@@ -224,16 +205,25 @@ public class PolicyMgr implements Writable {
**/
public ShowResultSet showPolicy(ShowPolicyStmt showStmt) throws
AnalysisException {
List<List<String>> rows = Lists.newArrayList();
- List<Policy> policies;
long currentDbId = ConnectContext.get().getCurrentDbId();
- if (showStmt.getUser() == null) {
- policies =
Catalog.getCurrentCatalog().getPolicyMgr().getDbPolicies(currentDbId);
- } else {
- policies = Catalog.getCurrentCatalog().getPolicyMgr()
- .getDbUserPolicies(currentDbId,
showStmt.getUser().getQualifiedUser());
+ Policy checkedPolicy = null;
+ switch (showStmt.getType()) {
+ case ROW:
+ default:
+ RowPolicy rowPolicy = new RowPolicy();
+ if (showStmt.getUser() != null) {
+ rowPolicy.setUser(showStmt.getUser());
+ }
+ if (currentDbId != -1) {
+ rowPolicy.setDbId(currentDbId);
+ }
+ checkedPolicy = rowPolicy;
}
+ final Policy finalCheckedPolicy = checkedPolicy;
+ List<Policy> policies =
typeToPolicyMap.getOrDefault(showStmt.getType(), new ArrayList<>()).stream()
+ .filter(p ->
p.matchPolicy(finalCheckedPolicy)).collect(Collectors.toList());
for (Policy policy : policies) {
- if (policy.getWherePredicate() == null) {
+ if (policy.isInvalid()) {
continue;
}
rows.add(policy.getShowInfo());
@@ -241,94 +231,87 @@ public class PolicyMgr implements Writable {
return new ShowResultSet(showStmt.getMetaData(), rows);
}
- private void updateAllMergePolicyMap() {
- dbIdToPolicyMap.forEach((dbId, policies) ->
updateMergePolicyMap(dbId));
- }
-
- private void updateAllUserPolicySet() {
- userPolicySet.clear();
- dbIdToPolicyMap.forEach((dbId, policies) ->
- policies.forEach(policy ->
userPolicySet.add(policy.getUser().getQualifiedUser())));
- }
-
-
- private boolean existUserPolicy(String user) {
- readLock();
- try {
- for (Map<String, Policy> policies : dbIdToMergePolicyMap.values())
{
- for (Policy policy : policies.values()) {
- if (policy.getUser().getQualifiedUser().equals(user)) {
- return true;
- }
- }
- }
- return false;
- } finally {
- readUnlock();
- }
-
- }
-
/**
* The merge policy cache needs to be regenerated after the update.
**/
- private void updateMergePolicyMap(long dbId) {
+ private void updateMergeTablePolicyMap() {
readLock();
try {
- if (!dbIdToPolicyMap.containsKey(dbId)) {
+ if (!typeToPolicyMap.containsKey(PolicyTypeEnum.ROW)) {
return;
}
- List<Policy> policies = dbIdToPolicyMap.get(dbId);
- Map<String, Policy> andMap = new HashMap<>();
- Map<String, Policy> orMap = new HashMap<>();
- for (Policy policy : policies) {
- // read from json, need set isAnalyzed
- policy.getUser().setIsAnalyzed();
- String key =
- Joiner.on("-").join(policy.getTableId(),
policy.getType(), policy.getUser().getQualifiedUser());
- // merge wherePredicate
- if
(CompoundPredicate.Operator.AND.equals(policy.getFilterType().getOp())) {
- Policy frontPolicy = andMap.get(key);
- if (frontPolicy == null) {
- andMap.put(key, policy.clone());
- } else {
- frontPolicy.setWherePredicate(
+ List<Policy> allPolicies = typeToPolicyMap.get(PolicyTypeEnum.ROW);
+ Map<Long, List<RowPolicy>> policyMap = new HashMap<>();
+ dbIdToMergeTablePolicyMap.clear();
+ userPolicySet.clear();
+ for (Policy policy : allPolicies) {
+ if (!(policy instanceof RowPolicy)) {
+ continue;
+ }
+ RowPolicy rowPolicy = (RowPolicy) policy;
+ if (!policyMap.containsKey(rowPolicy.getDbId())) {
+ policyMap.put(rowPolicy.getDbId(), new ArrayList<>());
+ }
+ policyMap.get(rowPolicy.getDbId()).add(rowPolicy);
+ if (rowPolicy.getUser() != null) {
+ userPolicySet.add(rowPolicy.getUser().getQualifiedUser());
+ }
+ }
+ for (Map.Entry<Long, List<RowPolicy>> entry :
policyMap.entrySet()) {
+ List<RowPolicy> policies = entry.getValue();
+ Map<String, RowPolicy> andMap = new HashMap<>();
+ Map<String, RowPolicy> orMap = new HashMap<>();
+ for (RowPolicy rowPolicy : policies) {
+ // read from json, need set isAnalyzed
+ rowPolicy.getUser().setIsAnalyzed();
+ String key =
+ Joiner.on("-").join(rowPolicy.getTableId(),
rowPolicy.getType(),
+ rowPolicy.getUser().getQualifiedUser());
+ // merge wherePredicate
+ if
(CompoundPredicate.Operator.AND.equals(rowPolicy.getFilterType().getOp())) {
+ RowPolicy frontPolicy = andMap.get(key);
+ if (frontPolicy == null) {
+ andMap.put(key, rowPolicy.clone());
+ } else {
+ frontPolicy.setWherePredicate(
new
CompoundPredicate(CompoundPredicate.Operator.AND,
frontPolicy.getWherePredicate(),
- policy.getWherePredicate()));
- andMap.put(key, frontPolicy.clone());
- }
- } else {
- Policy frontPolicy = orMap.get(key);
- if (frontPolicy == null) {
- orMap.put(key, policy.clone());
+ rowPolicy.getWherePredicate()));
+ andMap.put(key, frontPolicy.clone());
+ }
} else {
- frontPolicy.setWherePredicate(
+ RowPolicy frontPolicy = orMap.get(key);
+ if (frontPolicy == null) {
+ orMap.put(key, rowPolicy.clone());
+ } else {
+ frontPolicy.setWherePredicate(
new
CompoundPredicate(CompoundPredicate.Operator.OR,
frontPolicy.getWherePredicate(),
- policy.getWherePredicate()));
- orMap.put(key, frontPolicy.clone());
+ rowPolicy.getWherePredicate()));
+ orMap.put(key, frontPolicy.clone());
+ }
}
}
- }
- Map<String, Policy> mergeMap = new HashMap<>();
- Set<String> policyKeys = new HashSet<>();
- policyKeys.addAll(andMap.keySet());
- policyKeys.addAll(orMap.keySet());
- policyKeys.forEach(key -> {
- if (andMap.containsKey(key) && orMap.containsKey(key)) {
- Policy mergePolicy = andMap.get(key).clone();
- mergePolicy.setWherePredicate(
+ Map<String, RowPolicy> mergeMap = new HashMap<>();
+ Set<String> policyKeys = new HashSet<>();
+ policyKeys.addAll(andMap.keySet());
+ policyKeys.addAll(orMap.keySet());
+ policyKeys.forEach(key -> {
+ if (andMap.containsKey(key) && orMap.containsKey(key)) {
+ RowPolicy mergePolicy = andMap.get(key).clone();
+ mergePolicy.setWherePredicate(
new
CompoundPredicate(CompoundPredicate.Operator.AND,
mergePolicy.getWherePredicate(),
- orMap.get(key).getWherePredicate()));
- mergeMap.put(key, mergePolicy);
- }
- if (!andMap.containsKey(key)) {
- mergeMap.put(key, orMap.get(key));
- }
- if (!orMap.containsKey(key)) {
- mergeMap.put(key, andMap.get(key));
- }
- });
- dbIdToMergePolicyMap.put(dbId, mergeMap);
+ orMap.get(key).getWherePredicate()));
+ mergeMap.put(key, mergePolicy);
+ }
+ if (!andMap.containsKey(key)) {
+ mergeMap.put(key, orMap.get(key));
+ }
+ if (!orMap.containsKey(key)) {
+ mergeMap.put(key, andMap.get(key));
+ }
+ });
+ long dbId = entry.getKey();
+ dbIdToMergeTablePolicyMap.put(dbId, mergeMap);
+ }
} finally {
readUnlock();
}
@@ -345,10 +328,8 @@ public class PolicyMgr implements Writable {
public static PolicyMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
PolicyMgr policyMgr = GsonUtils.GSON.fromJson(json, PolicyMgr.class);
- // update merge policy cache
- policyMgr.updateAllMergePolicyMap();
- // update user policy cache
- policyMgr.updateAllUserPolicySet();
+ // update merge policy cache and userPolicySet
+ policyMgr.updateMergeTablePolicyMap();
return policyMgr;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java
index 483b8cd93b..bf82f7e0df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java
@@ -22,5 +22,5 @@ package org.apache.doris.policy;
**/
public enum PolicyTypeEnum {
- ROW
+ ROW, STORAGE
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
similarity index 52%
copy from fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
copy to fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
index d617dda050..ab4b0a74c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
@@ -26,22 +26,15 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.persist.gson.GsonPostProcessable;
-import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
-import lombok.AllArgsConstructor;
import lombok.Data;
+import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
@@ -50,41 +43,27 @@ import java.util.List;
* Save policy for filtering data.
**/
@Data
-@AllArgsConstructor
-public class Policy implements Writable, GsonPostProcessable {
+public class RowPolicy extends Policy {
- public static final String ROW_POLICY = "ROW";
+ private static final Logger LOG = LogManager.getLogger(RowPolicy.class);
- private static final Logger LOG = LogManager.getLogger(Policy.class);
+ /**
+ * Policy bind user.
+ **/
+ @SerializedName(value = "user")
+ private UserIdentity user = null;
@SerializedName(value = "dbId")
- private long dbId;
+ private long dbId = -1;
@SerializedName(value = "tableId")
- private long tableId;
-
- @SerializedName(value = "policyName")
- private String policyName;
-
- /**
- * ROW.
- **/
- @SerializedName(value = "type")
- private PolicyTypeEnum type;
+ private long tableId = -1;
/**
* PERMISSIVE | RESTRICTIVE, If multiple types exist, the last type
prevails.
**/
@SerializedName(value = "filterType")
- private final FilterType filterType;
-
- private Expr wherePredicate;
-
- /**
- * Policy bind user.
- **/
- @SerializedName(value = "user")
- private final UserIdentity user;
+ private FilterType filterType = null;
/**
* Use for Serialization/deserialization.
@@ -92,20 +71,32 @@ public class Policy implements Writable,
GsonPostProcessable {
@SerializedName(value = "originStmt")
private String originStmt;
+ private Expr wherePredicate = null;
+
+ public RowPolicy() {}
+
/**
- * Trans stmt to Policy.
- **/
- public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws
AnalysisException {
- String curDb = stmt.getTableName().getDb();
- if (curDb == null) {
- curDb = ConnectContext.get().getDatabase();
- }
- Database db =
Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
- Table table =
db.getTableOrAnalysisException(stmt.getTableName().getTbl());
- UserIdentity userIdent = stmt.getUser();
- userIdent.analyze(ConnectContext.get().getClusterName());
- return new Policy(db.getId(), table.getId(), stmt.getPolicyName(),
stmt.getType(), stmt.getFilterType(),
- stmt.getWherePredicate(), userIdent,
stmt.getOrigStmt().originStmt);
+ * Policy for Table. Policy of ROW or others.
+ *
+ * @param type PolicyType
+ * @param policyName policy name
+ * @param dbId database i
+ * @param user username
+ * @param originStmt origin stmt
+ * @param tableId table id
+ * @param filterType filter type
+ * @param wherePredicate where predicate
+ */
+ public RowPolicy(final PolicyTypeEnum type, final String policyName, long
dbId,
+ UserIdentity user, String originStmt, final long tableId,
+ final FilterType filterType, final Expr wherePredicate) {
+ super(type, policyName);
+ this.user = user;
+ this.dbId = dbId;
+ this.tableId = tableId;
+ this.filterType = filterType;
+ this.originStmt = originStmt;
+ this.wherePredicate = wherePredicate;
}
/**
@@ -118,19 +109,6 @@ public class Policy implements Writable,
GsonPostProcessable {
this.filterType.name(), this.wherePredicate.toSql(),
this.user.getQualifiedUser(), this.originStmt);
}
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, GsonUtils.GSON.toJson(this));
- }
-
- /**
- * Read policy from file.
- **/
- public static Policy read(DataInput in) throws IOException {
- String json = Text.readString(in);
- return GsonUtils.GSON.fromJson(json, Policy.class);
- }
-
@Override
public void gsonPostProcess() throws IOException {
if (wherePredicate != null) {
@@ -142,13 +120,44 @@ public class Policy implements Writable,
GsonPostProcessable {
CreatePolicyStmt stmt = (CreatePolicyStmt)
SqlParserUtils.getFirstStmt(parser);
wherePredicate = stmt.getWherePredicate();
} catch (Exception e) {
- throw new IOException("policy parse originStmt error", e);
+ throw new IOException("table policy parse originStmt error", e);
+ }
+ }
+
+ @Override
+ public RowPolicy clone() {
+ return new RowPolicy(this.type, this.policyName, this.dbId, this.user,
this.originStmt, this.tableId,
+ this.filterType, this.wherePredicate);
+ }
+
+ private boolean checkMatched(long dbId, long tableId, PolicyTypeEnum type,
+ String policyName, UserIdentity user) {
+ return super.checkMatched(type, policyName)
+ && (dbId == -1 || dbId == this.dbId)
+ && (tableId == -1 || tableId == this.tableId)
+ && (user == null || this.user == null
+ || StringUtils.equals(user.getQualifiedUser(),
this.user.getQualifiedUser()));
+ }
+
+ @Override
+ public boolean matchPolicy(Policy checkedPolicyCondition) {
+ if (!(checkedPolicyCondition instanceof RowPolicy)) {
+ return false;
}
+ RowPolicy rowPolicy = (RowPolicy) checkedPolicyCondition;
+ return checkMatched(rowPolicy.getDbId(), rowPolicy.getTableId(),
rowPolicy.getType(),
+ rowPolicy.getPolicyName(), rowPolicy.getUser());
+ }
+
+ @Override
+ public boolean matchPolicy(DropPolicyLog checkedDropPolicyLogCondition) {
+ return checkMatched(checkedDropPolicyLogCondition.getDbId(),
checkedDropPolicyLogCondition.getTableId(),
+ checkedDropPolicyLogCondition.getType(),
checkedDropPolicyLogCondition.getPolicyName(),
+ checkedDropPolicyLogCondition.getUser());
}
@Override
- public Policy clone() {
- return new Policy(this.dbId, this.tableId, this.policyName, this.type,
this.filterType, this.wherePredicate,
- this.user, this.originStmt);
+ public boolean isInvalid() {
+ return (wherePredicate == null);
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
index 6bbc977e71..5a9c0b905e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.policy;
import org.apache.doris.analysis.CreateUserStmt;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.ShowPolicyStmt;
import org.apache.doris.analysis.TablePattern;
@@ -36,6 +37,11 @@ import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.List;
/**
@@ -199,4 +205,40 @@ public class PolicyTest extends TestWithFeService {
dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1");
dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1");
}
+
+ @Test
+ public void testReadWrite() throws IOException, AnalysisException {
+ PolicyTypeEnum type = PolicyTypeEnum.ROW;
+ String policyName = "policy_name";
+ long dbId = 10;
+ UserIdentity user = new UserIdentity("test_policy", "%");
+ String originStmt = "CREATE ROW POLICY test_row_policy ON test.table1"
+ + " AS PERMISSIVE TO test_policy USING (k1 = 1)";
+ long tableId = 100;
+ FilterType filterType = FilterType.PERMISSIVE;
+ Expr wherePredicate = null;
+
+ Policy rowPolicy = new RowPolicy(type, policyName, dbId, user,
+ originStmt, tableId, filterType,
wherePredicate);
+
+ ByteArrayOutputStream emptyOutputStream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(emptyOutputStream);
+ rowPolicy.write(output);
+ byte[] bytes = emptyOutputStream.toByteArray();
+ System.out.println(emptyOutputStream.toString());
+ DataInputStream input = new DataInputStream(new
ByteArrayInputStream(bytes));
+
+ Policy newPolicy = Policy.read(input);
+ Assertions.assertTrue(newPolicy instanceof RowPolicy);
+ RowPolicy newRowPolicy = (RowPolicy) newPolicy;
+ Assertions.assertEquals(type, newRowPolicy.getType());
+ Assertions.assertEquals(policyName, newRowPolicy.getPolicyName());
+ Assertions.assertEquals(dbId, newRowPolicy.getDbId());
+ user.analyze(SystemInfoService.DEFAULT_CLUSTER);
+ newRowPolicy.getUser().analyze(SystemInfoService.DEFAULT_CLUSTER);
+ Assertions.assertEquals(user.getQualifiedUser(),
newRowPolicy.getUser().getQualifiedUser());
+ Assertions.assertEquals(originStmt, newRowPolicy.getOriginStmt());
+ Assertions.assertEquals(tableId, newRowPolicy.getTableId());
+ Assertions.assertEquals(filterType, newRowPolicy.getFilterType());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]