This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a9350739fa [SPARK-40640][CORE] SparkHadoopUtil to set origin of 
hadoop/hive config options
4a9350739fa is described below

commit 4a9350739fa3aee75932b3b0cf2a8d867db801a4
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Thu Oct 13 12:28:35 2022 -0700

    [SPARK-40640][CORE] SparkHadoopUtil to set origin of hadoop/hive config 
options
    
    ### What changes were proposed in this pull request?
    
    The options passed from spark conf, hive-site.xml, AWS env vars now all 
record this in their source attribute of the entries.
    
    The Configuration Writable methods do not propagate this, so it is not as 
useful cluster-wide than it could be. It does help with some of the basic 
troubleshooting.
    
    ### Why are the changes needed?
    
    Helps when troubleshooting where options make their way down. These can be 
examined
    and logged later.
    
    For example, my cloudstore diagnosticss JAR can do this in its storediag 
command
    and in an s3a AWS credential provider. I may add some of that logging
    at debug to the ASF hadoop implementations.
    
    https://github.com/steveloughran/cloudstore
    
    ### Does this PR introduce _any_ user-facing change?
    
    Not *really*. It's a very low level diagnostics feature in the Hadoop 
configuration classes.
    
    ### How was this patch tested?
    
    New tests added; existing tests enhanced.
    
    Closes #38084 from steveloughran/SPARK-40640-spark-conf-propagation.
    
    Lead-authored-by: Steve Loughran <ste...@cloudera.com>
    Co-authored-by: Steve Loughran <ste...@apache.org>
    Co-authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/deploy/SparkHadoopUtil.scala  | 151 +++++++++++++++++----
 .../apache/spark/deploy/SparkHadoopUtilSuite.scala |  95 +++++++++++--
 .../spark/sql/hive/client/HiveClientImpl.scala     |   5 +-
 3 files changed, 215 insertions(+), 36 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index ad456fb0ee9..8532246dc9e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream, File, IOException}
+import java.net.InetAddress
 import java.security.PrivilegedExceptionAction
 import java.text.DateFormat
 import java.util.{Arrays, Date, Locale}
@@ -415,6 +416,58 @@ private[spark] object SparkHadoopUtil extends Logging {
    */
   private[spark] val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml"
 
+  /**
+   * Source for hive-site.xml configuration options.
+   */
+  private[deploy] val SOURCE_HIVE_SITE = "Set by Spark from hive-site.xml"
+
+  /**
+   * Source for configuration options set by spark when another source is
+   * not explicitly declared.
+   */
+  private[spark] val SOURCE_SPARK = "Set by Spark"
+
+  /**
+   * Source for configuration options with `spark.hadoop.` prefix copied
+   * from spark-defaults.
+   */
+  private[deploy] val SOURCE_SPARK_HADOOP =
+    "Set by Spark from keys starting with 'spark.hadoop'"
+
+  /*
+   * The AWS Authentication environment variables documented in
+   * 
https://docs.aws.amazon.com/sdkref/latest/guide/environment-variables.html.
+   * There are alternative names defined in 
`com.amazonaws.SDKGlobalConfiguration`
+   * and which are picked up by the authentication provider
+   * `EnvironmentVariableCredentialsProvider`; those are not propagated.
+   */
+
+  /**
+   * AWS Access key.
+   */
+  private[deploy] val ENV_VAR_AWS_ACCESS_KEY = "AWS_ACCESS_KEY_ID"
+
+  /**
+   * AWS Secret Key.
+   */
+  private[deploy] val ENV_VAR_AWS_SECRET_KEY = "AWS_SECRET_ACCESS_KEY"
+
+  /**
+   * AWS Session token.
+   */
+  private[deploy] val ENV_VAR_AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN"
+
+  /**
+   * Source for configuration options with `spark.hive.` prefix copied
+   * from spark-defaults.
+   */
+  private[deploy] val SOURCE_SPARK_HIVE = "Set by Spark from keys starting 
with 'spark.hive'"
+
+  /**
+   * Hadoop configuration options set to their default values.
+   */
+  private[deploy] val SET_TO_DEFAULT_VALUES = "Set by Spark to default values"
+
   def get: SparkHadoopUtil = instance
 
   /**
@@ -437,27 +490,52 @@ private[spark] object SparkHadoopUtil extends Logging {
     // Note: this null check is around more than just access to the "conf" 
object to maintain
     // the behavior of the old implementation of this code, for backwards 
compatibility.
     if (conf != null) {
-      // Explicitly check for S3 environment variables
-      val keyId = System.getenv("AWS_ACCESS_KEY_ID")
-      val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
-      if (keyId != null && accessKey != null) {
-        hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
-        hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
-        hadoopConf.set("fs.s3a.access.key", keyId)
-        hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
-        hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
-        hadoopConf.set("fs.s3a.secret.key", accessKey)
-
-        val sessionToken = System.getenv("AWS_SESSION_TOKEN")
-        if (sessionToken != null) {
-          hadoopConf.set("fs.s3a.session.token", sessionToken)
-        }
-      }
+      appendS3CredentialsFromEnvironment(hadoopConf,
+        System.getenv(ENV_VAR_AWS_ACCESS_KEY),
+        System.getenv(ENV_VAR_AWS_SECRET_KEY),
+        System.getenv(ENV_VAR_AWS_SESSION_TOKEN))
       appendHiveConfigs(hadoopConf)
       appendSparkHadoopConfigs(conf, hadoopConf)
       appendSparkHiveConfigs(conf, hadoopConf)
       val bufferSize = conf.get(BUFFER_SIZE).toString
-      hadoopConf.set("io.file.buffer.size", bufferSize)
+      hadoopConf.set("io.file.buffer.size", bufferSize, BUFFER_SIZE.key)
+    }
+  }
+
+  /**
+   * Append any AWS secrets from the environment variables
+   * if both `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` are set.
+   * If these two are set and `AWS_SESSION_TOKEN` is also set,
+   * then `fs.s3a.session.token`.
+   * The option is set with a source string which includes the hostname
+   * on which it was set. This can help debug propagation issues.
+   *
+   * @param hadoopConf configuration to patch
+   * @param keyId key ID or null
+   * @param accessKey secret key
+   * @param sessionToken session token.
+   */
+  // Exposed for testing
+  private[deploy] def appendS3CredentialsFromEnvironment(
+      hadoopConf: Configuration,
+      keyId: String,
+      accessKey: String,
+      sessionToken: String): Unit = {
+    if (keyId != null && accessKey != null) {
+      // source prefix string; will have environment variable added
+      val source = SOURCE_SPARK + " on " + InetAddress.getLocalHost.toString + 
" from "
+      hadoopConf.set("fs.s3.awsAccessKeyId", keyId, source + 
ENV_VAR_AWS_ACCESS_KEY)
+      hadoopConf.set("fs.s3n.awsAccessKeyId", keyId, source + 
ENV_VAR_AWS_ACCESS_KEY)
+      hadoopConf.set("fs.s3a.access.key", keyId, source + 
ENV_VAR_AWS_ACCESS_KEY)
+      hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey, source + 
ENV_VAR_AWS_SECRET_KEY)
+      hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey, source + 
ENV_VAR_AWS_SECRET_KEY)
+      hadoopConf.set("fs.s3a.secret.key", accessKey, source + 
ENV_VAR_AWS_SECRET_KEY)
+
+      // look for session token if the other variables were set
+      if (sessionToken != null) {
+        hadoopConf.set("fs.s3a.session.token", sessionToken,
+          source + ENV_VAR_AWS_SESSION_TOKEN)
+      }
     }
   }
 
@@ -474,40 +552,61 @@ private[spark] object SparkHadoopUtil extends Logging {
 
   private def appendHiveConfigs(hadoopConf: Configuration): Unit = {
     hiveConfKeys.foreach { kv =>
-      hadoopConf.set(kv.getKey, kv.getValue)
+      hadoopConf.set(kv.getKey, kv.getValue, SOURCE_HIVE_SITE)
     }
   }
 
   private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: 
Configuration): Unit = {
     // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
     for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
-      hadoopConf.set(key.substring("spark.hadoop.".length), value)
+      hadoopConf.set(key.substring("spark.hadoop.".length), value,
+        SOURCE_SPARK_HADOOP)
     }
+    val setBySpark = SET_TO_DEFAULT_VALUES
     if 
(conf.getOption("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version").isEmpty)
 {
-      hadoopConf.set("mapreduce.fileoutputcommitter.algorithm.version", "1")
+      hadoopConf.set("mapreduce.fileoutputcommitter.algorithm.version", "1", 
setBySpark)
     }
-    // Since Hadoop 3.3.1, HADOOP-17597 starts to throw exceptions by default
+    // In Hadoop 3.3.1, HADOOP-17597 starts to throw exceptions by default
+    // this has been reverted in 3.3.2 (HADOOP-17928); setting it to
+    // true here is harmless
     if 
(conf.getOption("spark.hadoop.fs.s3a.downgrade.syncable.exceptions").isEmpty) {
-      hadoopConf.set("fs.s3a.downgrade.syncable.exceptions", "true")
+      hadoopConf.set("fs.s3a.downgrade.syncable.exceptions", "true", 
setBySpark)
     }
     // In Hadoop 3.3.1, AWS region handling with the default "" endpoint only 
works
     // in EC2 deployments or when the AWS CLI is installed.
     // The workaround is to set the name of the S3 endpoint explicitly,
     // if not already set. See HADOOP-17771.
-    // This change is harmless on older versions and compatible with
-    // later Hadoop releases
     if (hadoopConf.get("fs.s3a.endpoint", "").isEmpty &&
       hadoopConf.get("fs.s3a.endpoint.region") == null) {
       // set to US central endpoint which can also connect to buckets
       // in other regions at the expense of a HEAD request during fs creation
-      hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com")
+      hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com", setBySpark)
     }
   }
 
   private def appendSparkHiveConfigs(conf: SparkConf, hadoopConf: 
Configuration): Unit = {
     // Copy any "spark.hive.foo=bar" spark properties into conf as 
"hive.foo=bar"
     for ((key, value) <- conf.getAll if key.startsWith("spark.hive.")) {
-      hadoopConf.set(key.substring("spark.".length), value)
+      hadoopConf.set(key.substring("spark.".length), value, SOURCE_SPARK_HIVE)
+    }
+  }
+
+  /**
+   * Extract the sources of a configuration key, or a default value if
+   * the key is not found or it has no known sources.
+   * Note that options provided by credential providers (JCEKS stores etc)
+   * are not resolved, so values retrieved by Configuration.getPassword()
+   * may not be recorded as having an origin.
+   * @param hadoopConf hadoop configuration to examine.
+   * @param key key to look up
+   * @return the origin of the current entry in the configuration, or the 
empty string.
+   */
+  def propertySources(hadoopConf: Configuration, key: String): String = {
+    val sources = hadoopConf.getPropertySources(key)
+    if (sources != null && sources.nonEmpty) {
+      sources.mkString(",")
+    } else {
+      ""
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
index 2aa125ef2a3..17f1476cd8d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.deploy
 
+import java.net.InetAddress
+
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil.{SET_TO_DEFAULT_VALUES, 
SOURCE_SPARK_HADOOP, SOURCE_SPARK_HIVE}
+import org.apache.spark.internal.config.BUFFER_SIZE
 
 class SparkHadoopUtilSuite extends SparkFunSuite {
 
@@ -32,9 +36,10 @@ class SparkHadoopUtilSuite extends SparkFunSuite {
     val hadoopConf = new Configuration(false)
     sc.set("spark.hadoop.orc.filterPushdown", "true")
     new SparkHadoopUtil().appendSparkHadoopConfigs(sc, hadoopConf)
-    assertConfigValue(hadoopConf, "orc.filterPushdown", "true" )
-    assertConfigValue(hadoopConf, "fs.s3a.downgrade.syncable.exceptions", 
"true")
-    assertConfigValue(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com")
+    assertConfigMatches(hadoopConf, "orc.filterPushdown", "true", 
SOURCE_SPARK_HADOOP)
+    assertConfigMatches(hadoopConf, "fs.s3a.downgrade.syncable.exceptions", 
"true",
+      SET_TO_DEFAULT_VALUES)
+    assertConfigMatches(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com", 
SET_TO_DEFAULT_VALUES)
   }
 
   /**
@@ -46,7 +51,7 @@ class SparkHadoopUtilSuite extends SparkFunSuite {
     val hadoopConf = new Configuration(false)
     sc.set("spark.hadoop.fs.s3a.endpoint", "")
     new SparkHadoopUtil().appendSparkHadoopConfigs(sc, hadoopConf)
-    assertConfigValue(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com")
+    assertConfigMatches(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com", 
SET_TO_DEFAULT_VALUES)
   }
 
   /**
@@ -67,7 +72,7 @@ class SparkHadoopUtilSuite extends SparkFunSuite {
    * If the endpoint region is set (even to a blank string) in
    * "spark.hadoop.fs.s3a.endpoint.region" then the endpoint is not set,
    * even when the s3a endpoint is "".
-   * This supports a feature in later hadoop versions where this configuration
+   * This supports a feature in hadoop 3.3.1 where this configuration
    * pair triggers a revert to the "SDK to work out the region" algorithm,
    * which works on EC2 deployments.
    */
@@ -80,6 +85,44 @@ class SparkHadoopUtilSuite extends SparkFunSuite {
     assertConfigValue(hadoopConf, "fs.s3a.endpoint", null)
   }
 
+  /**
+   * spark.hive.* is passed to the hadoop config as hive.*.
+   */
+  test("SPARK-40640: spark.hive propagation") {
+    val sc = new SparkConf()
+    val hadoopConf = new Configuration(false)
+    sc.set("spark.hive.hiveoption", "value")
+    new SparkHadoopUtil().appendS3AndSparkHadoopHiveConfigurations(sc, 
hadoopConf)
+    assertConfigMatches(hadoopConf, "hive.hiveoption", "value", 
SOURCE_SPARK_HIVE)
+  }
+
+  /**
+   * The explicit buffer size propagation records this.
+   */
+  test("SPARK-40640: buffer size propagation") {
+    val sc = new SparkConf()
+    val hadoopConf = new Configuration(false)
+    sc.set(BUFFER_SIZE.key, "123")
+    new SparkHadoopUtil().appendS3AndSparkHadoopHiveConfigurations(sc, 
hadoopConf)
+    assertConfigMatches(hadoopConf, "io.file.buffer.size", "123", 
BUFFER_SIZE.key)
+  }
+
+  test("SPARK-40640: aws credentials from environment variables") {
+    val hadoopConf = new Configuration(false)
+    SparkHadoopUtil.appendS3CredentialsFromEnvironment(hadoopConf,
+      "access-key", "secret-key", "session-token")
+    val source = "Set by Spark on " + InetAddress.getLocalHost + " from "
+    assertConfigMatches(hadoopConf, "fs.s3a.access.key", "access-key", source)
+    assertConfigMatches(hadoopConf, "fs.s3a.secret.key", "secret-key", source)
+    assertConfigMatches(hadoopConf, "fs.s3a.session.token", "session-token", 
source)
+  }
+
+  test("SPARK-19739: S3 session token propagation requires access and secret 
keys") {
+    val hadoopConf = new Configuration(false)
+    SparkHadoopUtil.appendS3CredentialsFromEnvironment(hadoopConf, null, null, 
"session-token")
+    assertConfigValue(hadoopConf, "fs.s3a.session.token", null)
+  }
+
   /**
    * Assert that a hadoop configuration option has the expected value.
    * @param hadoopConf configuration to query
@@ -87,10 +130,46 @@ class SparkHadoopUtilSuite extends SparkFunSuite {
    * @param expected expected value.
    */
   private def assertConfigValue(
-    hadoopConf: Configuration,
-    key: String,
-    expected: String): Unit = {
+      hadoopConf: Configuration,
+      key: String,
+      expected: String): Unit = {
     assert(hadoopConf.get(key) === expected,
       s"Mismatch in expected value of $key")
   }
+
+  /**
+   * Assert that a hadoop configuration option has the expected value
+   * and has the expected source.
+   *
+   * @param hadoopConf configuration to query
+   * @param key        key to look up
+   * @param expected   expected value.
+   * @param expectedSource string required to be in the property source string
+   */
+  private def assertConfigMatches(
+      hadoopConf: Configuration,
+      key: String,
+      expected: String,
+      expectedSource: String): Unit = {
+    assertConfigValue(hadoopConf, key, expected)
+    assertConfigSourceContains(hadoopConf, key, expectedSource)
+  }
+
+  /**
+   * Assert that a source of a configuration matches a specific string.
+   * @param hadoopConf hadoop configuration
+   * @param key key to probe
+   * @param expectedSource expected source
+   */
+  private def assertConfigSourceContains(
+      hadoopConf: Configuration,
+      key: String,
+      expectedSource: String): Unit = {
+    val v = hadoopConf.get(key)
+    // get the source list
+    val origin = SparkHadoopUtil.propertySources(hadoopConf, key)
+    assert(origin.nonEmpty, s"Sources are missing for '$key' with value '$v'")
+    assert(origin.contains(expectedSource),
+      s"Expected source $key with value $v: and source $origin to contain 
$expectedSource")
+  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index db600bcd3d4..f6dc35131b7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -1303,7 +1304,7 @@ private[hive] object HiveClientImpl extends Logging {
     // 3: we set all entries in config to this hiveConf.
     val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> 
kv.getValue) ++
       sparkConf.getAll.toMap ++ extraConfig).toMap
-    confMap.foreach { case (k, v) => hiveConf.set(k, v) }
+    confMap.foreach { case (k, v) => hiveConf.set(k, v, SOURCE_SPARK) }
     SQLConf.get.redactOptions(confMap).foreach { case (k, v) =>
       logDebug(s"Applying Hadoop/Hive/Spark and extra properties to Hive 
Conf:$k=$v")
     }
@@ -1321,7 +1322,7 @@ private[hive] object HiveClientImpl extends Logging {
     if (hiveConf.get("hive.execution.engine") == "tez") {
       logWarning("Detected HiveConf hive.execution.engine is 'tez' and will be 
reset to 'mr'" +
         " to disable useless hive logic")
-      hiveConf.set("hive.execution.engine", "mr")
+      hiveConf.set("hive.execution.engine", "mr", SOURCE_SPARK)
     }
     hiveConf
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to