This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 28dad1b  [SPARK-33504][CORE] The application log in the Spark history 
server contains sensitive attributes should be redacted
28dad1b is described below

commit 28dad1ba770e5b7f7cf542da1ae3f05975a969c6
Author: neko <echoh...@gmail.com>
AuthorDate: Wed Dec 2 09:24:19 2020 -0600

    [SPARK-33504][CORE] The application log in the Spark history server 
contains sensitive attributes should be redacted
    
    ### What changes were proposed in this pull request?
    To make sure the sensitive attributes to be redacted in the history server 
log.
    
    ### Why are the changes needed?
    We found the secure attributes like password  in SparkListenerJobStart and 
SparkListenerStageSubmitted events would not been redated, resulting in 
sensitive attributes can be viewd directly.
    The screenshot can be viewed in the attachment of JIRA spark-33504
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    muntual test works well, I have also added unit testcase.
    
    Closes #30446 from akiyamaneko/eventlog_unredact.
    
    Authored-by: neko <echoh...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../spark/scheduler/EventLoggingListener.scala     | 24 +++++++-
 .../scheduler/EventLoggingListenerSuite.scala      | 64 +++++++++++++++++++++-
 2 files changed, 85 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 1fda03f..d4e22d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.scheduler
 
 import java.net.URI
+import java.util.Properties
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
@@ -103,7 +105,7 @@ private[spark] class EventLoggingListener(
 
   // Events that do not trigger a flush
   override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
-    logEvent(event)
+    logEvent(event.copy(properties = redactProperties(event.properties)))
     if (shouldLogStageExecutorMetrics) {
       // record the peak metrics for the new stage
       liveStageExecutorMetrics.put((event.stageInfo.stageId, 
event.stageInfo.attemptNumber()),
@@ -156,7 +158,9 @@ private[spark] class EventLoggingListener(
     logEvent(event, flushLogger = true)
   }
 
-  override def onJobStart(event: SparkListenerJobStart): Unit = 
logEvent(event, flushLogger = true)
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+    logEvent(event.copy(properties = redactProperties(event.properties)), 
flushLogger = true)
+  }
 
   override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, 
flushLogger = true)
 
@@ -276,6 +280,22 @@ private[spark] class EventLoggingListener(
     logWriter.stop()
   }
 
+  private def redactProperties(properties: Properties): Properties = {
+    if (properties == null) {
+      return properties
+    }
+    val redactedProperties = new Properties
+    // properties may contain some custom local properties such as stage/job 
description
+    // only properties in sparkConf need to be redacted.
+    val (globalProperties, localProperties) = 
properties.asScala.toSeq.partition {
+      case (key, _) => sparkConf.contains(key)
+    }
+    (Utils.redact(sparkConf, globalProperties) ++ localProperties).foreach {
+      case (key, value) => redactedProperties.setProperty(key, value)
+    }
+    redactedProperties
+  }
+
   private[spark] def redactEvent(
       event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = 
{
     // environmentDetails maps a string descriptor to a set of properties
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index c4a8bcb..7acb845 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.io.{File, InputStream}
-import java.util.Arrays
+import java.util.{Arrays, Properties}
 
 import scala.collection.immutable.Map
 import scala.collection.mutable
@@ -98,6 +98,68 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
     assert(redactedProps(key) == "*********(redacted)")
   }
 
+  test("Spark-33504 sensitive attributes redaction in properties") {
+    val (secretKey, secretPassword) = 
("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
+      "secret_password")
+    val (customKey, customValue) = ("parse_token", "secret_password")
+
+    val conf = getLoggingConf(testDirPath, None).set(secretKey, secretPassword)
+
+    val properties = new Properties()
+    properties.setProperty(secretKey, secretPassword)
+    properties.setProperty(customKey, customValue)
+
+    val logName = "properties-reaction-test"
+    val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+    val listenerBus = new LiveListenerBus(conf)
+
+    val stageId = 1
+    val jobId = 1
+    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0,
+      Seq.empty, Seq.empty, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+
+    val events = Array(SparkListenerStageSubmitted(stageInfo, properties),
+      SparkListenerJobStart(jobId, 0, Seq(stageInfo), properties))
+
+    eventLogger.start()
+    listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+    listenerBus.addToEventLogQueue(eventLogger)
+    events.foreach(event => listenerBus.post(event))
+    listenerBus.stop()
+    eventLogger.stop()
+
+    val logData = EventLogFileReader.openEventLog(new 
Path(eventLogger.logWriter.logPath),
+      fileSystem)
+    try {
+      val lines = readLines(logData)
+      val logStart = SparkListenerLogStart(SPARK_VERSION)
+      assert(lines.size === 3)
+      assert(lines(0).contains("SparkListenerLogStart"))
+      assert(lines(1).contains("SparkListenerStageSubmitted"))
+      assert(lines(2).contains("SparkListenerJobStart"))
+
+      lines.foreach{
+        line => JsonProtocol.sparkEventFromJson(parse(line)) match {
+          case logStartEvent: SparkListenerLogStart =>
+            assert(logStartEvent == logStart)
+
+          case stageSubmittedEvent: SparkListenerStageSubmitted =>
+            assert(stageSubmittedEvent.properties.getProperty(secretKey) == 
"*********(redacted)")
+            assert(stageSubmittedEvent.properties.getProperty(customKey) ==  
customValue)
+
+          case jobStartEvent : SparkListenerJobStart =>
+            assert(jobStartEvent.properties.getProperty(secretKey) == 
"*********(redacted)")
+            assert(jobStartEvent.properties.getProperty(customKey) ==  
customValue)
+
+          case _ => assert(false)
+        }
+      }
+    } finally {
+      logData.close()
+    }
+  }
+
   test("Executor metrics update") {
     testStageExecutorMetricsEventLogging()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to