This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3fb9f6f  Revert "[SPARK-33504][CORE] The application log in the Spark 
history server contains sensitive attributes should be redacted"
3fb9f6f is described below

commit 3fb9f6f670328d31bf24fcb6b805715f5828ce06
Author: Thomas Graves <tgra...@nvidia.com>
AuthorDate: Wed Dec 2 14:38:19 2020 -0600

    Revert "[SPARK-33504][CORE] The application log in the Spark history server 
contains sensitive attributes should be redacted"
    
    ### What changes were proposed in this pull request?
    
    Revert SPARK-33504 on branch-3.0 compilation error. Original PR 
https://github.com/apache/spark/pull/30446
    
    This reverts commit e59179b7326112f526e4c000e21146df283d861c.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #30576 from tgravescs/revert33504.
    
    Authored-by: Thomas Graves <tgra...@nvidia.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../spark/scheduler/EventLoggingListener.scala     | 24 +-------
 .../scheduler/EventLoggingListenerSuite.scala      | 64 +---------------------
 2 files changed, 3 insertions(+), 85 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 5673c02..24e2a5e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -18,9 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.net.URI
-import java.util.Properties
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
@@ -105,7 +103,7 @@ private[spark] class EventLoggingListener(
 
   // Events that do not trigger a flush
   override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
-    logEvent(event.copy(properties = redactProperties(event.properties)))
+    logEvent(event)
     if (shouldLogStageExecutorMetrics) {
       // record the peak metrics for the new stage
       liveStageExecutorMetrics.put((event.stageInfo.stageId, 
event.stageInfo.attemptNumber()),
@@ -158,9 +156,7 @@ private[spark] class EventLoggingListener(
     logEvent(event, flushLogger = true)
   }
 
-  override def onJobStart(event: SparkListenerJobStart): Unit = {
-    logEvent(event.copy(properties = redactProperties(event.properties)), 
flushLogger = true)
-  }
+  override def onJobStart(event: SparkListenerJobStart): Unit = 
logEvent(event, flushLogger = true)
 
   override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, 
flushLogger = true)
 
@@ -250,22 +246,6 @@ private[spark] class EventLoggingListener(
     logWriter.stop()
   }
 
-  private def redactProperties(properties: Properties): Properties = {
-    if (properties == null) {
-      return properties
-    }
-    val redactedProperties = new Properties
-    // properties may contain some custom local properties such as stage/job 
description
-    // only properties in sparkConf need to be redacted.
-    val (globalProperties, localProperties) = 
properties.asScala.toSeq.partition {
-      case (key, _) => sparkConf.contains(key)
-    }
-    (Utils.redact(sparkConf, globalProperties) ++ localProperties).foreach {
-      case (key, value) => redactedProperties.setProperty(key, value)
-    }
-    redactedProperties
-  }
-
   private[spark] def redactEvent(
       event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = 
{
     // environmentDetails maps a string descriptor to a set of properties
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index e0e6406..046564d 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.io.{File, InputStream}
-import java.util.{Arrays, Properties}
+import java.util.Arrays
 
 import scala.collection.immutable.Map
 import scala.collection.mutable
@@ -96,68 +96,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
     assert(redactedProps(key) == "*********(redacted)")
   }
 
-  test("Spark-33504 sensitive attributes redaction in properties") {
-    val (secretKey, secretPassword) = 
("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
-      "secret_password")
-    val (customKey, customValue) = ("parse_token", "secret_password")
-
-    val conf = getLoggingConf(testDirPath, None).set(secretKey, secretPassword)
-
-    val properties = new Properties()
-    properties.setProperty(secretKey, secretPassword)
-    properties.setProperty(customKey, customValue)
-
-    val logName = "properties-reaction-test"
-    val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
-    val listenerBus = new LiveListenerBus(conf)
-
-    val stageId = 1
-    val jobId = 1
-    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0,
-      Seq.empty, Seq.empty, "details",
-      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
-
-    val events = Array(SparkListenerStageSubmitted(stageInfo, properties),
-      SparkListenerJobStart(jobId, 0, Seq(stageInfo), properties))
-
-    eventLogger.start()
-    listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
-    listenerBus.addToEventLogQueue(eventLogger)
-    events.foreach(event => listenerBus.post(event))
-    listenerBus.stop()
-    eventLogger.stop()
-
-    val logData = EventLogFileReader.openEventLog(new 
Path(eventLogger.logWriter.logPath),
-      fileSystem)
-    try {
-      val lines = readLines(logData)
-      val logStart = SparkListenerLogStart(SPARK_VERSION)
-      assert(lines.size === 3)
-      assert(lines(0).contains("SparkListenerLogStart"))
-      assert(lines(1).contains("SparkListenerStageSubmitted"))
-      assert(lines(2).contains("SparkListenerJobStart"))
-
-      lines.foreach{
-        line => JsonProtocol.sparkEventFromJson(parse(line)) match {
-          case logStartEvent: SparkListenerLogStart =>
-            assert(logStartEvent == logStart)
-
-          case stageSubmittedEvent: SparkListenerStageSubmitted =>
-            assert(stageSubmittedEvent.properties.getProperty(secretKey) == 
"*********(redacted)")
-            assert(stageSubmittedEvent.properties.getProperty(customKey) ==  
customValue)
-
-          case jobStartEvent : SparkListenerJobStart =>
-            assert(jobStartEvent.properties.getProperty(secretKey) == 
"*********(redacted)")
-            assert(jobStartEvent.properties.getProperty(customKey) ==  
customValue)
-
-          case _ => assert(false)
-        }
-      }
-    } finally {
-      logData.close()
-    }
-  }
-
   test("Executor metrics update") {
     testStageExecutorMetricsEventLogging()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to