This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c1007c2  [SPARK-27849][SQL] Redact treeString of FileTable and 
DataSourceV2ScanExecBase
c1007c2 is described below

commit c1007c2f7c4fafd1568e08d45164af0956c445b7
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Wed May 29 13:32:21 2019 -0700

    [SPARK-27849][SQL] Redact treeString of FileTable and 
DataSourceV2ScanExecBase
    
    ## What changes were proposed in this pull request?
    
    To follow https://github.com/apache/spark/pull/17397, the output of 
FileTable and DataSourceV2ScanExecBase can contain sensitive information (like 
Amazon keys). Such information should not end up in logs, or be exposed to 
non-privileged users.
    
    This PR is to add a redaction facility for these outputs to resolve the 
issue. A user can enable this by setting a regex in the same 
spark.redaction.string.regex configuration as V1.
    ## How was this patch tested?
    
    Unit test
    
    Closes #24719 from gengliangwang/RedactionSuite.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../datasources/v2/DataSourceV2ScanExecBase.scala  |  5 +-
 .../datasources/v2/FileDataSourceV2.scala          | 11 ++-
 .../sql/execution/datasources/v2/FileScan.scala    | 17 ++++
 .../sql/execution/datasources/v2/orc/OrcScan.scala |  4 +
 .../DataSourceScanExecRedactionSuite.scala         | 96 ++++++++++++++++++----
 5 files changed, 114 insertions(+), 19 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index da71e78..9ad683f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
 import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
PartitionReaderFactory, Scan, SupportsReportPartitioning}
+import org.apache.spark.util.Utils
 
 trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
 
@@ -35,7 +36,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with 
ColumnarBatchScan {
   def readerFactory: PartitionReaderFactory
 
   override def simpleString(maxFields: Int): String = {
-    s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} 
${scan.description()}"
+    val result =
+      s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} 
${scan.description()}"
+    Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, result)
   }
 
   override def outputPartitioning: physical.Partitioning = scan match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index d60d429..bcb10ae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -17,12 +17,14 @@
 package org.apache.spark.sql.execution.datasources.v2
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.TableProvider
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.Utils
 
 /**
  * A base interface for data source v2 implementations of the built-in 
file-based data sources.
@@ -49,6 +51,13 @@ trait FileDataSourceV2 extends TableProvider with 
DataSourceRegister {
   }
 
   protected def getTableName(paths: Seq[String]): String = {
-    shortName() + ":" + paths.mkString(";")
+    val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",")
+    Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
+  }
+
+  private def qualifiedPathName(path: String): String = {
+    val hdfsPath = new Path(path)
+    val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+    hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index 84a1274..b2f3c4d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import java.util.{Locale, OptionalLong}
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -29,6 +30,7 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.Utils
 
 abstract class FileScan(
     sparkSession: SparkSession,
@@ -42,6 +44,21 @@ abstract class FileScan(
     false
   }
 
+  override def description(): String = {
+    val locationDesc =
+      fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", 
", "]")
+    val metadata: Map[String, String] = Map(
+      "ReadSchema" -> readDataSchema.catalogString,
+      "Location" -> locationDesc)
+    val metadataStr = metadata.toSeq.sorted.map {
+      case (key, value) =>
+        val redactedValue =
+          Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, 
value)
+        key + ": " + StringUtils.abbreviate(redactedValue, 100)
+    }.mkString(", ")
+    s"${this.getClass.getSimpleName} $metadataStr"
+  }
+
   protected def partitions: Seq[FilePartition] = {
     val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty)
     val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, 
selectedPartitions)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index b129c94..a4fb034 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -58,4 +58,8 @@ case class OrcScan(
   }
 
   override def hashCode(): Int = getClass.hashCode()
+
+  override def description(): String = {
+    super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", 
", "]")
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
index 11a1c9a..ec59459 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
@@ -19,26 +19,35 @@ package org.apache.spark.sql.execution
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 
 /**
- * Suite that tests the redaction of DataSourceScanExec
+ * Test suite base for testing the redaction of 
DataSourceScanExec/BatchScanExec.
  */
-class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext 
{
+abstract class DataSourceScanRedactionTest extends QueryTest with 
SharedSQLContext {
 
   override protected def sparkConf: SparkConf = super.sparkConf
-    .set("spark.redaction.string.regex", "file:/[\\w_]+")
+    .set("spark.redaction.string.regex", "file:/[\\w-_@/]+")
+
+  final protected def isIncluded(queryExecution: QueryExecution, msg: String): 
Boolean = {
+    queryExecution.toString.contains(msg) ||
+      queryExecution.simpleString.contains(msg) ||
+      queryExecution.stringWithStats.contains(msg)
+  }
+
+  protected def getRootPath(df: DataFrame): Path
 
   test("treeString is redacted") {
     withTempDir { dir =>
       val basePath = dir.getCanonicalPath
-      spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, 
"foo=1").toString)
-      val df = spark.read.parquet(basePath)
+      spark.range(0, 10).toDF("a").write.orc(new Path(basePath, 
"foo=1").toString)
+      val df = spark.read.orc(basePath)
 
-      val rootPath = 
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
-        .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
+      val rootPath = getRootPath(df)
       assert(rootPath.toString.contains(dir.toURI.getPath.stripSuffix("/")))
 
       assert(!df.queryExecution.sparkPlan.treeString(verbose = 
true).contains(rootPath.getName))
@@ -53,18 +62,24 @@ class DataSourceScanExecRedactionSuite extends QueryTest 
with SharedSQLContext {
       assert(df.queryExecution.simpleString.contains(replacement))
     }
   }
+}
 
-  private def isIncluded(queryExecution: QueryExecution, msg: String): Boolean 
= {
-    queryExecution.toString.contains(msg) ||
-    queryExecution.simpleString.contains(msg) ||
-    queryExecution.stringWithStats.contains(msg)
-  }
+/**
+ * Suite that tests the redaction of DataSourceScanExec
+ */
+class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest {
+  override protected def sparkConf: SparkConf = super.sparkConf
+    .set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "orc")
+
+  override protected def getRootPath(df: DataFrame): Path =
+    df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+      .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
 
   test("explain is redacted using SQLConf") {
     withTempDir { dir =>
       val basePath = dir.getCanonicalPath
-      spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, 
"foo=1").toString)
-      val df = spark.read.parquet(basePath)
+      spark.range(0, 10).toDF("a").write.orc(new Path(basePath, 
"foo=1").toString)
+      val df = spark.read.orc(basePath)
       val replacement = "*********"
 
       // Respect SparkConf and replace file:/
@@ -86,8 +101,8 @@ class DataSourceScanExecRedactionSuite extends QueryTest 
with SharedSQLContext {
   test("FileSourceScanExec metadata") {
     withTempPath { path =>
       val dir = path.getCanonicalPath
-      spark.range(0, 10).write.parquet(dir)
-      val df = spark.read.parquet(dir)
+      spark.range(0, 10).write.orc(dir)
+      val df = spark.read.orc(dir)
 
       assert(isIncluded(df.queryExecution, "Format"))
       assert(isIncluded(df.queryExecution, "ReadSchema"))
@@ -98,5 +113,52 @@ class DataSourceScanExecRedactionSuite extends QueryTest 
with SharedSQLContext {
       assert(isIncluded(df.queryExecution, "Location"))
     }
   }
+}
+
+/**
+ * Suite that tests the redaction of BatchScanExec.
+ */
+class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest {
 
+  override protected def sparkConf: SparkConf = super.sparkConf
+    .set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "")
+
+  override protected def getRootPath(df: DataFrame): Path =
+    df.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get
+      
.asInstanceOf[BatchScanExec].scan.asInstanceOf[OrcScan].fileIndex.rootPaths.head
+
+  test("explain is redacted using SQLConf") {
+    withTempDir { dir =>
+      val basePath = dir.getCanonicalPath
+      spark.range(0, 10).toDF("a").write.orc(new Path(basePath, 
"foo=1").toString)
+      val df = spark.read.orc(basePath)
+      val replacement = "*********"
+
+      // Respect SparkConf and replace file:/
+      assert(isIncluded(df.queryExecution, replacement))
+      assert(isIncluded(df.queryExecution, "BatchScan"))
+      assert(!isIncluded(df.queryExecution, "file:/"))
+
+      withSQLConf(SQLConf.SQL_STRING_REDACTION_PATTERN.key -> "(?i)BatchScan") 
{
+        // Respect SQLConf and replace FileScan
+        assert(isIncluded(df.queryExecution, replacement))
+
+        assert(!isIncluded(df.queryExecution, "BatchScan"))
+        assert(isIncluded(df.queryExecution, "file:/"))
+      }
+    }
+  }
+
+  test("FileScan description") {
+    withTempPath { path =>
+      val dir = path.getCanonicalPath
+      spark.range(0, 10).write.orc(dir)
+      val df = spark.read.orc(dir)
+
+      assert(isIncluded(df.queryExecution, "ReadSchema"))
+      assert(isIncluded(df.queryExecution, "BatchScan"))
+      assert(isIncluded(df.queryExecution, "PushedFilters"))
+      assert(isIncluded(df.queryExecution, "Location"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to