This is an automated email from the ASF dual-hosted git repository. liuxun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push: new 97d6dfc SUBMARINE-418. Support Data Masking for spark-security 97d6dfc is described below commit 97d6dfc9e1c24f3869e6802a4afea58472b75ec4 Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Wed Mar 11 14:21:42 2020 +0800 SUBMARINE-418. Support Data Masking for spark-security ### What is this PR for? add data masking for spark-security, which enable administrators to mask sensitive data per column e.g. phone num 12345678901 can be masked only show last 4 - nnnnnnn8901 ### What type of PR is it? feature ### Todos ### What is the Jira issue? * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE-418 ### How should this be tested? add unit tests ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? /No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Kent Yao <yaooq...@hotmail.com> Closes #219 from yaooqinn/SUBMARINE-418 and squashes the following commits: d2a70ea [Kent Yao] nit 95cc726 [Kent Yao] SUBMARINE-418. Support Data Masking --- .../optimizer/SubmarineDataMaskingExtension.scala | 227 +++++++++++++++++++++ .../plans/logical/SubmarineDataMasking.scala} | 18 +- .../execution/SubmarineSparkPlanOmitStrategy.scala | 5 +- .../spark/security/RangerSparkSQLExtension.scala | 3 +- .../src/test/resources/sparkSql_hive_jenkins.json | 64 ++++++ .../org/apache/spark/sql/SubmarineSparkUtils.scala | 7 +- .../SubmarineDataMaskingExtensionTest.scala | 48 +++++ .../spark/security/DataMaskingSQLTest.scala | 189 +++++++++++++++++ 8 files changed, 546 insertions(+), 15 deletions(-) diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala new file mode 100644 index 0000000..7e2b99c --- /dev/null +++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable + +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.security.UserGroupInformation +import org.apache.ranger.plugin.model.RangerPolicy +import org.apache.ranger.plugin.policyengine.RangerAccessResult +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, HiveTableRelation} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, NamedExpression, SubqueryExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project, SubmarineDataMasking, Subquery} +import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateViewCommand, InsertIntoDataSourceDirCommand} +import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand} +import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} + +import org.apache.submarine.spark.security.{RangerSparkAccessRequest, RangerSparkAuditHandler, RangerSparkPlugin, RangerSparkResource, SparkAccessType} + +/** + * An Apache Spark's [[Optimizer]] extension for column data masking. + */ +case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[LogicalPlan] { + import RangerPolicy._ + + // register all built-in masking udfs + Map("mask" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMask", + "mask_first_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskFirstN", + "mask_hash" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash", + "mask_last_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskLastN", + "mask_show_first_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowFirstN", + "mask_show_last_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowLastN") + .map(x => CatalogFunction(FunctionIdentifier(x._1), x._2, Seq.empty)) + .foreach(spark.sessionState.catalog.registerFunction(_, true)) + + private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate() + private lazy val sqlParser = spark.sessionState.sqlParser + private lazy val analyzer = spark.sessionState.analyzer + private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark) + + /** + * Collecting transformers from Ranger data masking policies, and mapping the to the + * [[LogicalPlan]] output attributes. + * + * @param plan the original logical plan with a underlying catalog table + * @param table the catalog table + * @return a list of key-value pairs of original expression with its masking representation + */ + private def collectTransformers( + plan: LogicalPlan, + table: CatalogTable, + aliases: mutable.Map[Alias, ExprId]): Map[ExprId, NamedExpression] = { + val auditHandler = new RangerSparkAuditHandler() + val ugi = UserGroupInformation.getCurrentUser + val userName = ugi.getShortUserName + val groups = ugi.getGroupNames.toSet + try { + val identifier = table.identifier + import org.apache.submarine.spark.security.SparkObjectType._ + + val maskEnableResults = plan.output.map { expr => + val resource = RangerSparkResource(COLUMN, identifier.database, identifier.table, expr.name) + val req = new RangerSparkAccessRequest(resource, userName, groups, COLUMN.toString, + SparkAccessType.SELECT, sparkPlugin.getClusterName) + (expr, sparkPlugin.evalDataMaskPolicies(req, auditHandler)) + }.filter(x => isMaskEnabled(x._2)) + + val originMaskers = maskEnableResults.map { case (expr, result) => + if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_NULL)) { + val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}" + val plan = analyzer.execute(sqlParser.parsePlan(sql)) + (expr, plan) + } else if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_CUSTOM)) { + val maskVal = result.getMaskedValue + if (maskVal == null) { + val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}" + val plan = analyzer.execute(sqlParser.parsePlan(sql)) + (expr, plan) + } else { + val sql = s"SELECT ${maskVal.replace("{col}", expr.name)} AS ${expr.name} FROM" + + s" ${table.qualifiedName}" + val plan = analyzer.execute(sqlParser.parsePlan(sql)) + (expr, plan) + } + } else if (result.getMaskTypeDef != null) { + val transformer = result.getMaskTypeDef.getTransformer + if (StringUtils.isNotEmpty(transformer)) { + val trans = transformer.replace("{col}", expr.name) + val sql = s"SELECT $trans AS ${expr.name} FROM ${table.qualifiedName}" + val plan = analyzer.execute(sqlParser.parsePlan(sql)) + (expr, plan) + } else { + (expr, null) + } + } else { + (expr, null) + } + }.filter(_._2 != null) + + val formedMaskers: Map[ExprId, Alias] = + originMaskers.map { case (expr, p) => (expr, p.asInstanceOf[Project].projectList.head) } + .map { case (expr, attr) => + val originalAlias = attr.asInstanceOf[Alias] + val newChild = originalAlias.child mapChildren { + case _: AttributeReference => expr + case o => o + } + val newAlias = originalAlias.copy(child = newChild)( + originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata) + (expr.exprId, newAlias) + }.toMap + + val aliasedMaskers = new mutable.HashMap[ExprId, Alias]() + for ((alias, id) <- aliases if formedMaskers.contains(id)) { + val originalAlias = formedMaskers(id) + val newChild = originalAlias.child mapChildren { + case ar: AttributeReference => + ar.copy(name = alias.name)(alias.exprId, alias.qualifier) + case o => o + } + val newAlias = originalAlias.copy(child = newChild, alias.name)( + originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata) + aliasedMaskers.put(alias.exprId, newAlias) + } + formedMaskers ++ aliasedMaskers + } catch { + case e: Exception => throw e + } + } + + private def isMaskEnabled(result: RangerAccessResult): Boolean = { + result != null && result.isMaskEnabled + } + + private def hasCatalogTable(plan: LogicalPlan): Boolean = plan match { + case _: HiveTableRelation => true + case l: LogicalRelation if l.catalogTable.isDefined => true + case _ => false + } + + private def collectAllAliases(plan: LogicalPlan): mutable.HashMap[Alias, ExprId] = { + val aliases = new mutable.HashMap[Alias, ExprId]() + plan.transformAllExpressions { + case a: Alias => + a.child match { + case ne: NamedExpression => + aliases.put(a, ne.exprId) + case _ => + } + a + } + aliases + } + + private def collectAllTransformers( + plan: LogicalPlan, + aliases: mutable.Map[Alias, ExprId]): Map[ExprId, NamedExpression] = { + plan.collectLeaves().flatMap { + case h: HiveTableRelation => + collectTransformers(h, h.tableMeta, aliases) + case l: LogicalRelation if l.catalogTable.isDefined => + collectTransformers(l, l.catalogTable.get, aliases) + case _ => Seq.empty + }.toMap + } + + private def doMasking(plan: LogicalPlan): LogicalPlan = plan match { + case s: Subquery => s + case m: SubmarineDataMasking => m // escape the optimize iteration if already masked + case fixed if fixed.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty => fixed + case _ => + val aliases = collectAllAliases(plan) + val transformers = collectAllTransformers(plan, aliases) + val newPlan = + if (transformers.nonEmpty && plan.output.exists(o => transformers.get(o.exprId).nonEmpty)) { + val newOutput = plan.output.map(attr => transformers.getOrElse(attr.exprId, attr)) + Project(newOutput, plan) + } else { + plan + } + + val marked = newPlan transformUp { + case p if hasCatalogTable(p) => SubmarineDataMasking(p) + } + + marked transformAllExpressions { + case s: SubqueryExpression => + val Subquery(newPlan) = + rangerSparkOptimizer.execute(Subquery(SubmarineDataMasking(s.plan))) + s.withNewPlan(newPlan) + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan match { + case c: Command => c match { + case c: CreateDataSourceTableAsSelectCommand => c.copy(query = doMasking(c.query)) + case c: CreateHiveTableAsSelectCommand => c.copy(query = doMasking(c.query)) + case c: CreateViewCommand => c.copy(child = doMasking(c.child)) + case i: InsertIntoDataSourceCommand => i.copy(query = doMasking(i.query)) + case i: InsertIntoDataSourceDirCommand => i.copy(query = doMasking(i.query)) + case i: InsertIntoHadoopFsRelationCommand => i.copy(query = doMasking(i.query)) + case i: InsertIntoHiveDirCommand => i.copy(query = doMasking(i.query)) + case i: InsertIntoHiveTable => i.copy(query = doMasking(i.query)) + case s: SaveIntoDataSourceCommand => s.copy(query = doMasking(s.query)) + case cmd => cmd + } + case other => doMasking(other) + } +} diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala similarity index 60% copy from submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala copy to submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala index ae9aad8..4eba657 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala +++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala @@ -15,18 +15,14 @@ * limitations under the License. */ -package org.apache.spark.sql.execution +package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.{SparkSession, Strategy} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFilter} +import org.apache.spark.sql.catalyst.expressions.Attribute /** - * An Apache Spark's [[Strategy]] extension for omitting marker for row level filtering and data - * masking. + * A marker [[LogicalPlan]] for column data masking, which will be removed during + * LogicalPlan -> PhysicalPlan */ -case class SubmarineSparkPlanOmitStrategy(spark: SparkSession) extends Strategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case SubmarineRowFilter(child) => planLater(child) :: Nil - case _ => Nil - } -} \ No newline at end of file +case class SubmarineDataMasking(child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala index ae9aad8..b768eb6 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala +++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{SparkSession, Strategy} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFilter} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineDataMasking, SubmarineRowFilter} /** * An Apache Spark's [[Strategy]] extension for omitting marker for row level filtering and data @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFil case class SubmarineSparkPlanOmitStrategy(spark: SparkSession) extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case SubmarineRowFilter(child) => planLater(child) :: Nil + case SubmarineDataMasking(child) => planLater(child) :: Nil case _ => Nil } -} \ No newline at end of file +} diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala index 80dc127..e90e2f3 100644 --- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala +++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala @@ -18,13 +18,14 @@ package org.apache.submarine.spark.security import org.apache.spark.sql.SparkSessionExtensions -import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension} +import org.apache.spark.sql.catalyst.optimizer.{SubmarineDataMaskingExtension, SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension} import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy class RangerSparkSQLExtension extends Extensions { override def apply(ext: SparkSessionExtensions): Unit = { ext.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension) ext.injectOptimizerRule(SubmarineRowFilterExtension) + ext.injectOptimizerRule(SubmarineDataMaskingExtension) ext.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy) } } diff --git a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json index 6691216..b356fd5 100644 --- a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json +++ b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json @@ -2128,6 +2128,70 @@ "guid": "98a04cd7-8d14-4466-adc9-126d87a3af69", "isEnabled": true, "version": 1 + }, + { + "service": "hive_jenkins", + "name": "rangertbl5_value_show_last_4", + "policyType": 1, + "policyPriority": 0, + "description": "", + "isAuditEnabled": true, + "resources": { + "database": { + "values": [ + "default" + ], + "isExcludes": false, + "isRecursive": false + }, + "column": { + "values": [ + "value" + ], + "isExcludes": false, + "isRecursive": false + }, + "table": { + "values": [ + "rangertbl5" + ], + "isExcludes": false, + "isRecursive": false + } + }, + "policyItems": [], + "denyPolicyItems": [], + "allowExceptions": [], + "denyExceptions": [], + "dataMaskPolicyItems": [ + { + "dataMaskInfo": { + "dataMaskType": "MASK_SHOW_LAST_4" + }, + "accesses": [ + { + "type": "select", + "isAllowed": true + } + ], + "users": [ + "bob" + ], + "groups": [], + "conditions": [], + "delegateAdmin": false + } + ], + "rowFilterPolicyItems": [], + "options": {}, + "validitySchedules": [], + "policyLabels": [ + "" + ], + "id": 32, + "guid": "b3f1f1e0-2bd6-4b20-8a32-a531006ae151", + "isEnabled": true, + "version": 1 } ], "serviceDef": { diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala index 1472413..7f29cd7 100644 --- a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala +++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import java.security.PrivilegedExceptionAction import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension} +import org.apache.spark.sql.catalyst.optimizer.{SubmarineDataMaskingExtension, SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension} import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy object SubmarineSparkUtils { @@ -40,4 +40,9 @@ object SubmarineSparkUtils { spark.extensions.injectOptimizerRule(SubmarineRowFilterExtension) spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy) } + + def enableDataMasking(spark: SparkSession): Unit = { + spark.extensions.injectOptimizerRule(SubmarineDataMaskingExtension) + spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy) + } } diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala new file mode 100644 index 0000000..be12a93 --- /dev/null +++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.plans.logical.{Project, SubmarineDataMasking} +import org.apache.spark.sql.SubmarineSparkUtils +import org.apache.spark.sql.hive.test.TestHive +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +class SubmarineDataMaskingExtensionTest extends FunSuite with BeforeAndAfterAll { + + private val spark = TestHive.sparkSession.newSession() + + override def afterAll(): Unit = { + super.afterAll() + spark.reset() + } + + test("applying condition to original query if data masking exists in ranger") { + val extension = SubmarineDataMaskingExtension(spark) + val frame = spark.sql("select * from src") + SubmarineSparkUtils.withUser("bob") { + val plan = extension.apply(frame.queryExecution.optimizedPlan) + assert(plan.asInstanceOf[Project].projectList(1).collectLeaves().length === 7) + assert(plan.children.head.isInstanceOf[SubmarineDataMasking]) + } + + SubmarineSparkUtils.withUser("alice") { + val plan = extension.apply(frame.queryExecution.optimizedPlan) + assert(plan.isInstanceOf[SubmarineDataMasking]) + } + } +} diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala new file mode 100644 index 0000000..9a4d23c --- /dev/null +++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.submarine.spark.security + +import org.apache.commons.codec.digest.DigestUtils +import org.apache.spark.sql.SubmarineSparkUtils.{enableDataMasking, withUser} +import org.apache.spark.sql.catalyst.plans.logical.{Project, SubmarineDataMasking} +import org.apache.spark.sql.hive.test.TestHive +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +case class DataMaskingSQLTest() extends FunSuite with BeforeAndAfterAll { + private val spark = TestHive.sparkSession.newSession() + private lazy val sql = spark.sql _ + + override def beforeAll(): Unit = { + super.beforeAll() + + sql( + """ + |CREATE TABLE IF NOT EXISTS default.rangertbl1 AS SELECT * FROM default.src + """.stripMargin) + + sql( + """ + |CREATE TABLE IF NOT EXISTS default.rangertbl2 AS SELECT * FROM default.src + """.stripMargin) + + sql( + """ + |CREATE TABLE IF NOT EXISTS default.rangertbl3 AS SELECT * FROM default.src + """.stripMargin) + + sql( + """ + |CREATE TABLE IF NOT EXISTS default.rangertbl4 AS SELECT * FROM default.src + """.stripMargin) + + sql( + """ + |CREATE TABLE IF NOT EXISTS default.rangertbl5 AS SELECT * FROM default.src + """.stripMargin) + + sql( + """ + |CREATE TABLE IF NOT EXISTS default.rangertbl6 AS SELECT * FROM default.src + """.stripMargin) + enableDataMasking(spark) + } + + override def afterAll(): Unit = { + super.afterAll() + spark.reset() + } + + test("simple query") { + val statement = "select * from default.src" + withUser("bob") { + val df = sql(statement) + assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty) + assert(df.queryExecution.optimizedPlan.isInstanceOf[Project]) + val project = df.queryExecution.optimizedPlan.asInstanceOf[Project] + val masker = project.projectList(1) + assert(masker.name === "value") + assert(masker.children.exists(_.sql.contains("mask_show_last_n"))) + val row = df.take(1)(0) + assert(row.getString(1).startsWith("x"), "values should be masked") + } + withUser("alice") { + assert(!sql(statement).take(1)(0).getString(1).startsWith("x")) + } + } + + test("projection with ranger filter key") { + withUser("bob") { + val statement = "select key from default.src where key = 0" + val df = sql(statement) + assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty) + val row = df.take(1)(0) + assert(row.getInt(0) === 0, "key is not masked") + } + withUser("bob") { + val statement = "select value from default.src where key = 0" + val df = sql(statement) + assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty) + val row = df.take(1)(0) + assert(row.getString(0).startsWith("x"), "value is masked") + } + } + + test("alias") { + val statement = "select key as k1, value v1 from default.src" + withUser("bob") { + val df = sql(statement) + val row = df.take(1)(0) + assert(row.getString(1).startsWith("x"), "values should be masked") + } + } + + test("agg") { + val statement = "select sum(key) as k1, value v1 from default.src group by v1" + withUser("bob") { + val df = sql(statement) + println(df.queryExecution.optimizedPlan) + val row = df.take(1)(0) + assert(row.getString(1).startsWith("x"), "values should be masked") + } + withUser("alice") { + val df = sql(statement) + val row = df.take(1)(0) + assert(row.getString(1).startsWith("val"), "values should not be masked") + } + } + + test("MASK") { + val statement = "select * from default.rangertbl1" + withUser("bob") { + val df = sql(statement) + println(df.queryExecution.optimizedPlan) + val row = df.take(1)(0) + assert(row.getString(1).startsWith("x"), "values should be masked") + } + } + + test("MASK_SHOW_FIRST_4") { + val statement = "select * from default.rangertbl2" + withUser("bob") { + val df = sql(statement) + println(df.queryExecution.optimizedPlan) + val row = df.take(1)(0) + assert(row.getString(1).startsWith("val_x"), "values should show first 4 characters") + } + } + + test("MASK_HASH") { + val statement = "select * from default.rangertbl3 where value = 'val_277'" + withUser("bob") { + val df = sql(statement) + println(df.queryExecution.optimizedPlan) + val row = df.take(1)(0) + assert(row.getString(1) === DigestUtils.md5Hex("val_277"), "value is hashed") + } + } + + test("MASK_SHOW_LAST_4") { + val statement = "select * from default.rangertbl5 where value = 'val_277'" + withUser("bob") { + val df = sql(statement) + println(df.queryExecution.optimizedPlan) + val row = df.take(1)(0) + assert(row.getString(1) === "xxx_277", "value shows last 4 characters") + } + } + + test("NO MASKING") { + val statement = "select * from default.rangertbl6 where value = 'val_277'" + withUser("bob") { + val df = sql(statement) + println(df.queryExecution.optimizedPlan) + val row = df.take(1)(0) + assert(row.getString(1) === "val_277", "value has no mask") + } + } + + test("commands") { + withUser("bob") { + val statement = "create view v1 as select * from default.rangertbl5 where value = 'val_277'" + val df = sql(statement) + println(df.queryExecution.optimizedPlan) + + val row = sql("select * from v1").take(1)(0) + assert(row.getString(1) === "xxx_277", "value shows last 4 characters") + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@submarine.apache.org For additional commands, e-mail: dev-h...@submarine.apache.org