This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 3fb9f6f Revert "[SPARK-33504][CORE] The application log in the Spark history server contains sensitive attributes should be redacted" 3fb9f6f is described below commit 3fb9f6f670328d31bf24fcb6b805715f5828ce06 Author: Thomas Graves <tgra...@nvidia.com> AuthorDate: Wed Dec 2 14:38:19 2020 -0600 Revert "[SPARK-33504][CORE] The application log in the Spark history server contains sensitive attributes should be redacted" ### What changes were proposed in this pull request? Revert SPARK-33504 on branch-3.0 compilation error. Original PR https://github.com/apache/spark/pull/30446 This reverts commit e59179b7326112f526e4c000e21146df283d861c. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #30576 from tgravescs/revert33504. Authored-by: Thomas Graves <tgra...@nvidia.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../spark/scheduler/EventLoggingListener.scala | 24 +------- .../scheduler/EventLoggingListenerSuite.scala | 64 +--------------------- 2 files changed, 3 insertions(+), 85 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 5673c02..24e2a5e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -18,9 +18,7 @@ package org.apache.spark.scheduler import java.net.URI -import java.util.Properties -import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.conf.Configuration @@ -105,7 +103,7 @@ private[spark] class EventLoggingListener( // Events that do not trigger a flush override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { - logEvent(event.copy(properties = redactProperties(event.properties))) + logEvent(event) if (shouldLogStageExecutorMetrics) { // record the peak metrics for the new stage liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()), @@ -158,9 +156,7 @@ private[spark] class EventLoggingListener( logEvent(event, flushLogger = true) } - override def onJobStart(event: SparkListenerJobStart): Unit = { - logEvent(event.copy(properties = redactProperties(event.properties)), flushLogger = true) - } + override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true) override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true) @@ -250,22 +246,6 @@ private[spark] class EventLoggingListener( logWriter.stop() } - private def redactProperties(properties: Properties): Properties = { - if (properties == null) { - return properties - } - val redactedProperties = new Properties - // properties may contain some custom local properties such as stage/job description - // only properties in sparkConf need to be redacted. - val (globalProperties, localProperties) = properties.asScala.toSeq.partition { - case (key, _) => sparkConf.contains(key) - } - (Utils.redact(sparkConf, globalProperties) ++ localProperties).foreach { - case (key, value) => redactedProperties.setProperty(key, value) - } - redactedProperties - } - private[spark] def redactEvent( event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = { // environmentDetails maps a string descriptor to a set of properties diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index e0e6406..046564d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.io.{File, InputStream} -import java.util.{Arrays, Properties} +import java.util.Arrays import scala.collection.immutable.Map import scala.collection.mutable @@ -96,68 +96,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit assert(redactedProps(key) == "*********(redacted)") } - test("Spark-33504 sensitive attributes redaction in properties") { - val (secretKey, secretPassword) = ("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", - "secret_password") - val (customKey, customValue) = ("parse_token", "secret_password") - - val conf = getLoggingConf(testDirPath, None).set(secretKey, secretPassword) - - val properties = new Properties() - properties.setProperty(secretKey, secretPassword) - properties.setProperty(customKey, customValue) - - val logName = "properties-reaction-test" - val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) - val listenerBus = new LiveListenerBus(conf) - - val stageId = 1 - val jobId = 1 - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details", - resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) - - val events = Array(SparkListenerStageSubmitted(stageInfo, properties), - SparkListenerJobStart(jobId, 0, Seq(stageInfo), properties)) - - eventLogger.start() - listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) - listenerBus.addToEventLogQueue(eventLogger) - events.foreach(event => listenerBus.post(event)) - listenerBus.stop() - eventLogger.stop() - - val logData = EventLogFileReader.openEventLog(new Path(eventLogger.logWriter.logPath), - fileSystem) - try { - val lines = readLines(logData) - val logStart = SparkListenerLogStart(SPARK_VERSION) - assert(lines.size === 3) - assert(lines(0).contains("SparkListenerLogStart")) - assert(lines(1).contains("SparkListenerStageSubmitted")) - assert(lines(2).contains("SparkListenerJobStart")) - - lines.foreach{ - line => JsonProtocol.sparkEventFromJson(parse(line)) match { - case logStartEvent: SparkListenerLogStart => - assert(logStartEvent == logStart) - - case stageSubmittedEvent: SparkListenerStageSubmitted => - assert(stageSubmittedEvent.properties.getProperty(secretKey) == "*********(redacted)") - assert(stageSubmittedEvent.properties.getProperty(customKey) == customValue) - - case jobStartEvent : SparkListenerJobStart => - assert(jobStartEvent.properties.getProperty(secretKey) == "*********(redacted)") - assert(jobStartEvent.properties.getProperty(customKey) == customValue) - - case _ => assert(false) - } - } - } finally { - logData.close() - } - } - test("Executor metrics update") { testStageExecutorMetricsEventLogging() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org