This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c1007c2 [SPARK-27849][SQL] Redact treeString of FileTable and DataSourceV2ScanExecBase c1007c2 is described below commit c1007c2f7c4fafd1568e08d45164af0956c445b7 Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Wed May 29 13:32:21 2019 -0700 [SPARK-27849][SQL] Redact treeString of FileTable and DataSourceV2ScanExecBase ## What changes were proposed in this pull request? To follow https://github.com/apache/spark/pull/17397, the output of FileTable and DataSourceV2ScanExecBase can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non-privileged users. This PR is to add a redaction facility for these outputs to resolve the issue. A user can enable this by setting a regex in the same spark.redaction.string.regex configuration as V1. ## How was this patch tested? Unit test Closes #24719 from gengliangwang/RedactionSuite. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../datasources/v2/DataSourceV2ScanExecBase.scala | 5 +- .../datasources/v2/FileDataSourceV2.scala | 11 ++- .../sql/execution/datasources/v2/FileScan.scala | 17 ++++ .../sql/execution/datasources/v2/orc/OrcScan.scala | 4 + .../DataSourceScanExecRedactionSuite.scala | 96 ++++++++++++++++++---- 5 files changed, 114 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index da71e78..9ad683f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning} +import org.apache.spark.util.Utils trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan { @@ -35,7 +36,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan { def readerFactory: PartitionReaderFactory override def simpleString(maxFields: Int): String = { - s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}" + val result = + s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}" + Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, result) } override def outputPartitioning: physical.Partitioning = scan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index d60d429..bcb10ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -17,12 +17,14 @@ package org.apache.spark.sql.execution.datasources.v2 import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.TableProvider import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils /** * A base interface for data source v2 implementations of the built-in file-based data sources. @@ -49,6 +51,13 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { } protected def getTableName(paths: Seq[String]): String = { - shortName() + ":" + paths.mkString(";") + val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",") + Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name) + } + + private def qualifiedPathName(path: String): String = { + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 84a1274..b2f3c4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.{Locale, OptionalLong} +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -29,6 +30,7 @@ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils abstract class FileScan( sparkSession: SparkSession, @@ -42,6 +44,21 @@ abstract class FileScan( false } + override def description(): String = { + val locationDesc = + fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") + val metadata: Map[String, String] = Map( + "ReadSchema" -> readDataSchema.catalogString, + "Location" -> locationDesc) + val metadataStr = metadata.toSeq.sorted.map { + case (key, value) => + val redactedValue = + Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value) + key + ": " + StringUtils.abbreviate(redactedValue, 100) + }.mkString(", ") + s"${this.getClass.getSimpleName} $metadataStr" + } + protected def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index b129c94..a4fb034 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -58,4 +58,8 @@ case class OrcScan( } override def hashCode(): Int = getClass.hashCode() + + override def description(): String = { + super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala index 11a1c9a..ec59459 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -19,26 +19,35 @@ package org.apache.spark.sql.execution import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext /** - * Suite that tests the redaction of DataSourceScanExec + * Test suite base for testing the redaction of DataSourceScanExec/BatchScanExec. */ -class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { +abstract class DataSourceScanRedactionTest extends QueryTest with SharedSQLContext { override protected def sparkConf: SparkConf = super.sparkConf - .set("spark.redaction.string.regex", "file:/[\\w_]+") + .set("spark.redaction.string.regex", "file:/[\\w-_@/]+") + + final protected def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = { + queryExecution.toString.contains(msg) || + queryExecution.simpleString.contains(msg) || + queryExecution.stringWithStats.contains(msg) + } + + protected def getRootPath(df: DataFrame): Path test("treeString is redacted") { withTempDir { dir => val basePath = dir.getCanonicalPath - spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - val df = spark.read.parquet(basePath) + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + val df = spark.read.orc(basePath) - val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get - .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head + val rootPath = getRootPath(df) assert(rootPath.toString.contains(dir.toURI.getPath.stripSuffix("/"))) assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName)) @@ -53,18 +62,24 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { assert(df.queryExecution.simpleString.contains(replacement)) } } +} - private def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = { - queryExecution.toString.contains(msg) || - queryExecution.simpleString.contains(msg) || - queryExecution.stringWithStats.contains(msg) - } +/** + * Suite that tests the redaction of DataSourceScanExec + */ +class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { + override protected def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "orc") + + override protected def getRootPath(df: DataFrame): Path = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head test("explain is redacted using SQLConf") { withTempDir { dir => val basePath = dir.getCanonicalPath - spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - val df = spark.read.parquet(basePath) + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + val df = spark.read.orc(basePath) val replacement = "*********" // Respect SparkConf and replace file:/ @@ -86,8 +101,8 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { test("FileSourceScanExec metadata") { withTempPath { path => val dir = path.getCanonicalPath - spark.range(0, 10).write.parquet(dir) - val df = spark.read.parquet(dir) + spark.range(0, 10).write.orc(dir) + val df = spark.read.orc(dir) assert(isIncluded(df.queryExecution, "Format")) assert(isIncluded(df.queryExecution, "ReadSchema")) @@ -98,5 +113,52 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { assert(isIncluded(df.queryExecution, "Location")) } } +} + +/** + * Suite that tests the redaction of BatchScanExec. + */ +class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest { + override protected def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "") + + override protected def getRootPath(df: DataFrame): Path = + df.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get + .asInstanceOf[BatchScanExec].scan.asInstanceOf[OrcScan].fileIndex.rootPaths.head + + test("explain is redacted using SQLConf") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString) + val df = spark.read.orc(basePath) + val replacement = "*********" + + // Respect SparkConf and replace file:/ + assert(isIncluded(df.queryExecution, replacement)) + assert(isIncluded(df.queryExecution, "BatchScan")) + assert(!isIncluded(df.queryExecution, "file:/")) + + withSQLConf(SQLConf.SQL_STRING_REDACTION_PATTERN.key -> "(?i)BatchScan") { + // Respect SQLConf and replace FileScan + assert(isIncluded(df.queryExecution, replacement)) + + assert(!isIncluded(df.queryExecution, "BatchScan")) + assert(isIncluded(df.queryExecution, "file:/")) + } + } + } + + test("FileScan description") { + withTempPath { path => + val dir = path.getCanonicalPath + spark.range(0, 10).write.orc(dir) + val df = spark.read.orc(dir) + + assert(isIncluded(df.queryExecution, "ReadSchema")) + assert(isIncluded(df.queryExecution, "BatchScan")) + assert(isIncluded(df.queryExecution, "PushedFilters")) + assert(isIncluded(df.queryExecution, "Location")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org