This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e59179b  [SPARK-33504][CORE] The application log in the Spark history 
server contains sensitive attributes should be redacted
e59179b is described below

commit e59179b7326112f526e4c000e21146df283d861c
Author: neko <echoh...@gmail.com>
AuthorDate: Wed Dec 2 09:24:19 2020 -0600

    [SPARK-33504][CORE] The application log in the Spark history server 
contains sensitive attributes should be redacted
    
    ### What changes were proposed in this pull request?
    To make sure the sensitive attributes to be redacted in the history server 
log.
    
    ### Why are the changes needed?
    We found the secure attributes like password  in SparkListenerJobStart and 
SparkListenerStageSubmitted events would not been redated, resulting in 
sensitive attributes can be viewd directly.
    The screenshot can be viewed in the attachment of JIRA spark-33504
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    muntual test works well, I have also added unit testcase.
    
    Closes #30446 from akiyamaneko/eventlog_unredact.
    
    Authored-by: neko <echoh...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
    (cherry picked from commit 28dad1ba770e5b7f7cf542da1ae3f05975a969c6)
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../spark/scheduler/EventLoggingListener.scala     | 24 +++++++-
 .../scheduler/EventLoggingListenerSuite.scala      | 64 +++++++++++++++++++++-
 2 files changed, 85 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 24e2a5e..5673c02 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.scheduler
 
 import java.net.URI
+import java.util.Properties
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
@@ -103,7 +105,7 @@ private[spark] class EventLoggingListener(
 
   // Events that do not trigger a flush
   override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
-    logEvent(event)
+    logEvent(event.copy(properties = redactProperties(event.properties)))
     if (shouldLogStageExecutorMetrics) {
       // record the peak metrics for the new stage
       liveStageExecutorMetrics.put((event.stageInfo.stageId, 
event.stageInfo.attemptNumber()),
@@ -156,7 +158,9 @@ private[spark] class EventLoggingListener(
     logEvent(event, flushLogger = true)
   }
 
-  override def onJobStart(event: SparkListenerJobStart): Unit = 
logEvent(event, flushLogger = true)
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+    logEvent(event.copy(properties = redactProperties(event.properties)), 
flushLogger = true)
+  }
 
   override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, 
flushLogger = true)
 
@@ -246,6 +250,22 @@ private[spark] class EventLoggingListener(
     logWriter.stop()
   }
 
+  private def redactProperties(properties: Properties): Properties = {
+    if (properties == null) {
+      return properties
+    }
+    val redactedProperties = new Properties
+    // properties may contain some custom local properties such as stage/job 
description
+    // only properties in sparkConf need to be redacted.
+    val (globalProperties, localProperties) = 
properties.asScala.toSeq.partition {
+      case (key, _) => sparkConf.contains(key)
+    }
+    (Utils.redact(sparkConf, globalProperties) ++ localProperties).foreach {
+      case (key, value) => redactedProperties.setProperty(key, value)
+    }
+    redactedProperties
+  }
+
   private[spark] def redactEvent(
       event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = 
{
     // environmentDetails maps a string descriptor to a set of properties
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 046564d..e0e6406 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.io.{File, InputStream}
-import java.util.Arrays
+import java.util.{Arrays, Properties}
 
 import scala.collection.immutable.Map
 import scala.collection.mutable
@@ -96,6 +96,68 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
     assert(redactedProps(key) == "*********(redacted)")
   }
 
+  test("Spark-33504 sensitive attributes redaction in properties") {
+    val (secretKey, secretPassword) = 
("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
+      "secret_password")
+    val (customKey, customValue) = ("parse_token", "secret_password")
+
+    val conf = getLoggingConf(testDirPath, None).set(secretKey, secretPassword)
+
+    val properties = new Properties()
+    properties.setProperty(secretKey, secretPassword)
+    properties.setProperty(customKey, customValue)
+
+    val logName = "properties-reaction-test"
+    val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
+    val listenerBus = new LiveListenerBus(conf)
+
+    val stageId = 1
+    val jobId = 1
+    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0,
+      Seq.empty, Seq.empty, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+
+    val events = Array(SparkListenerStageSubmitted(stageInfo, properties),
+      SparkListenerJobStart(jobId, 0, Seq(stageInfo), properties))
+
+    eventLogger.start()
+    listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
+    listenerBus.addToEventLogQueue(eventLogger)
+    events.foreach(event => listenerBus.post(event))
+    listenerBus.stop()
+    eventLogger.stop()
+
+    val logData = EventLogFileReader.openEventLog(new 
Path(eventLogger.logWriter.logPath),
+      fileSystem)
+    try {
+      val lines = readLines(logData)
+      val logStart = SparkListenerLogStart(SPARK_VERSION)
+      assert(lines.size === 3)
+      assert(lines(0).contains("SparkListenerLogStart"))
+      assert(lines(1).contains("SparkListenerStageSubmitted"))
+      assert(lines(2).contains("SparkListenerJobStart"))
+
+      lines.foreach{
+        line => JsonProtocol.sparkEventFromJson(parse(line)) match {
+          case logStartEvent: SparkListenerLogStart =>
+            assert(logStartEvent == logStart)
+
+          case stageSubmittedEvent: SparkListenerStageSubmitted =>
+            assert(stageSubmittedEvent.properties.getProperty(secretKey) == 
"*********(redacted)")
+            assert(stageSubmittedEvent.properties.getProperty(customKey) ==  
customValue)
+
+          case jobStartEvent : SparkListenerJobStart =>
+            assert(jobStartEvent.properties.getProperty(secretKey) == 
"*********(redacted)")
+            assert(jobStartEvent.properties.getProperty(customKey) ==  
customValue)
+
+          case _ => assert(false)
+        }
+      }
+    } finally {
+      logData.close()
+    }
+  }
+
   test("Executor metrics update") {
     testStageExecutorMetricsEventLogging()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to