This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4a9350739fa [SPARK-40640][CORE] SparkHadoopUtil to set origin of hadoop/hive config options 4a9350739fa is described below commit 4a9350739fa3aee75932b3b0cf2a8d867db801a4 Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Thu Oct 13 12:28:35 2022 -0700 [SPARK-40640][CORE] SparkHadoopUtil to set origin of hadoop/hive config options ### What changes were proposed in this pull request? The options passed from spark conf, hive-site.xml, AWS env vars now all record this in their source attribute of the entries. The Configuration Writable methods do not propagate this, so it is not as useful cluster-wide than it could be. It does help with some of the basic troubleshooting. ### Why are the changes needed? Helps when troubleshooting where options make their way down. These can be examined and logged later. For example, my cloudstore diagnosticss JAR can do this in its storediag command and in an s3a AWS credential provider. I may add some of that logging at debug to the ASF hadoop implementations. https://github.com/steveloughran/cloudstore ### Does this PR introduce _any_ user-facing change? Not *really*. It's a very low level diagnostics feature in the Hadoop configuration classes. ### How was this patch tested? New tests added; existing tests enhanced. Closes #38084 from steveloughran/SPARK-40640-spark-conf-propagation. Lead-authored-by: Steve Loughran <ste...@cloudera.com> Co-authored-by: Steve Loughran <ste...@apache.org> Co-authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 151 +++++++++++++++++---- .../apache/spark/deploy/SparkHadoopUtilSuite.scala | 95 +++++++++++-- .../spark/sql/hive/client/HiveClientImpl.scala | 5 +- 3 files changed, 215 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index ad456fb0ee9..8532246dc9e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException} +import java.net.InetAddress import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Date, Locale} @@ -415,6 +416,58 @@ private[spark] object SparkHadoopUtil extends Logging { */ private[spark] val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml" + /** + * Source for hive-site.xml configuration options. + */ + private[deploy] val SOURCE_HIVE_SITE = "Set by Spark from hive-site.xml" + + /** + * Source for configuration options set by spark when another source is + * not explicitly declared. + */ + private[spark] val SOURCE_SPARK = "Set by Spark" + + /** + * Source for configuration options with `spark.hadoop.` prefix copied + * from spark-defaults. + */ + private[deploy] val SOURCE_SPARK_HADOOP = + "Set by Spark from keys starting with 'spark.hadoop'" + + /* + * The AWS Authentication environment variables documented in + * https://docs.aws.amazon.com/sdkref/latest/guide/environment-variables.html. + * There are alternative names defined in `com.amazonaws.SDKGlobalConfiguration` + * and which are picked up by the authentication provider + * `EnvironmentVariableCredentialsProvider`; those are not propagated. + */ + + /** + * AWS Access key. + */ + private[deploy] val ENV_VAR_AWS_ACCESS_KEY = "AWS_ACCESS_KEY_ID" + + /** + * AWS Secret Key. + */ + private[deploy] val ENV_VAR_AWS_SECRET_KEY = "AWS_SECRET_ACCESS_KEY" + + /** + * AWS Session token. + */ + private[deploy] val ENV_VAR_AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN" + + /** + * Source for configuration options with `spark.hive.` prefix copied + * from spark-defaults. + */ + private[deploy] val SOURCE_SPARK_HIVE = "Set by Spark from keys starting with 'spark.hive'" + + /** + * Hadoop configuration options set to their default values. + */ + private[deploy] val SET_TO_DEFAULT_VALUES = "Set by Spark to default values" + def get: SparkHadoopUtil = instance /** @@ -437,27 +490,52 @@ private[spark] object SparkHadoopUtil extends Logging { // Note: this null check is around more than just access to the "conf" object to maintain // the behavior of the old implementation of this code, for backwards compatibility. if (conf != null) { - // Explicitly check for S3 environment variables - val keyId = System.getenv("AWS_ACCESS_KEY_ID") - val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY") - if (keyId != null && accessKey != null) { - hadoopConf.set("fs.s3.awsAccessKeyId", keyId) - hadoopConf.set("fs.s3n.awsAccessKeyId", keyId) - hadoopConf.set("fs.s3a.access.key", keyId) - hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey) - hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey) - hadoopConf.set("fs.s3a.secret.key", accessKey) - - val sessionToken = System.getenv("AWS_SESSION_TOKEN") - if (sessionToken != null) { - hadoopConf.set("fs.s3a.session.token", sessionToken) - } - } + appendS3CredentialsFromEnvironment(hadoopConf, + System.getenv(ENV_VAR_AWS_ACCESS_KEY), + System.getenv(ENV_VAR_AWS_SECRET_KEY), + System.getenv(ENV_VAR_AWS_SESSION_TOKEN)) appendHiveConfigs(hadoopConf) appendSparkHadoopConfigs(conf, hadoopConf) appendSparkHiveConfigs(conf, hadoopConf) val bufferSize = conf.get(BUFFER_SIZE).toString - hadoopConf.set("io.file.buffer.size", bufferSize) + hadoopConf.set("io.file.buffer.size", bufferSize, BUFFER_SIZE.key) + } + } + + /** + * Append any AWS secrets from the environment variables + * if both `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` are set. + * If these two are set and `AWS_SESSION_TOKEN` is also set, + * then `fs.s3a.session.token`. + * The option is set with a source string which includes the hostname + * on which it was set. This can help debug propagation issues. + * + * @param hadoopConf configuration to patch + * @param keyId key ID or null + * @param accessKey secret key + * @param sessionToken session token. + */ + // Exposed for testing + private[deploy] def appendS3CredentialsFromEnvironment( + hadoopConf: Configuration, + keyId: String, + accessKey: String, + sessionToken: String): Unit = { + if (keyId != null && accessKey != null) { + // source prefix string; will have environment variable added + val source = SOURCE_SPARK + " on " + InetAddress.getLocalHost.toString + " from " + hadoopConf.set("fs.s3.awsAccessKeyId", keyId, source + ENV_VAR_AWS_ACCESS_KEY) + hadoopConf.set("fs.s3n.awsAccessKeyId", keyId, source + ENV_VAR_AWS_ACCESS_KEY) + hadoopConf.set("fs.s3a.access.key", keyId, source + ENV_VAR_AWS_ACCESS_KEY) + hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey, source + ENV_VAR_AWS_SECRET_KEY) + hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey, source + ENV_VAR_AWS_SECRET_KEY) + hadoopConf.set("fs.s3a.secret.key", accessKey, source + ENV_VAR_AWS_SECRET_KEY) + + // look for session token if the other variables were set + if (sessionToken != null) { + hadoopConf.set("fs.s3a.session.token", sessionToken, + source + ENV_VAR_AWS_SESSION_TOKEN) + } } } @@ -474,40 +552,61 @@ private[spark] object SparkHadoopUtil extends Logging { private def appendHiveConfigs(hadoopConf: Configuration): Unit = { hiveConfKeys.foreach { kv => - hadoopConf.set(kv.getKey, kv.getValue) + hadoopConf.set(kv.getKey, kv.getValue, SOURCE_HIVE_SITE) } } private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) + hadoopConf.set(key.substring("spark.hadoop.".length), value, + SOURCE_SPARK_HADOOP) } + val setBySpark = SET_TO_DEFAULT_VALUES if (conf.getOption("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version").isEmpty) { - hadoopConf.set("mapreduce.fileoutputcommitter.algorithm.version", "1") + hadoopConf.set("mapreduce.fileoutputcommitter.algorithm.version", "1", setBySpark) } - // Since Hadoop 3.3.1, HADOOP-17597 starts to throw exceptions by default + // In Hadoop 3.3.1, HADOOP-17597 starts to throw exceptions by default + // this has been reverted in 3.3.2 (HADOOP-17928); setting it to + // true here is harmless if (conf.getOption("spark.hadoop.fs.s3a.downgrade.syncable.exceptions").isEmpty) { - hadoopConf.set("fs.s3a.downgrade.syncable.exceptions", "true") + hadoopConf.set("fs.s3a.downgrade.syncable.exceptions", "true", setBySpark) } // In Hadoop 3.3.1, AWS region handling with the default "" endpoint only works // in EC2 deployments or when the AWS CLI is installed. // The workaround is to set the name of the S3 endpoint explicitly, // if not already set. See HADOOP-17771. - // This change is harmless on older versions and compatible with - // later Hadoop releases if (hadoopConf.get("fs.s3a.endpoint", "").isEmpty && hadoopConf.get("fs.s3a.endpoint.region") == null) { // set to US central endpoint which can also connect to buckets // in other regions at the expense of a HEAD request during fs creation - hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com") + hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com", setBySpark) } } private def appendSparkHiveConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { // Copy any "spark.hive.foo=bar" spark properties into conf as "hive.foo=bar" for ((key, value) <- conf.getAll if key.startsWith("spark.hive.")) { - hadoopConf.set(key.substring("spark.".length), value) + hadoopConf.set(key.substring("spark.".length), value, SOURCE_SPARK_HIVE) + } + } + + /** + * Extract the sources of a configuration key, or a default value if + * the key is not found or it has no known sources. + * Note that options provided by credential providers (JCEKS stores etc) + * are not resolved, so values retrieved by Configuration.getPassword() + * may not be recorded as having an origin. + * @param hadoopConf hadoop configuration to examine. + * @param key key to look up + * @return the origin of the current entry in the configuration, or the empty string. + */ + def propertySources(hadoopConf: Configuration, key: String): String = { + val sources = hadoopConf.getPropertySources(key) + if (sources != null && sources.nonEmpty) { + sources.mkString(",") + } else { + "" } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala index 2aa125ef2a3..17f1476cd8d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala @@ -17,9 +17,13 @@ package org.apache.spark.deploy +import java.net.InetAddress + import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil.{SET_TO_DEFAULT_VALUES, SOURCE_SPARK_HADOOP, SOURCE_SPARK_HIVE} +import org.apache.spark.internal.config.BUFFER_SIZE class SparkHadoopUtilSuite extends SparkFunSuite { @@ -32,9 +36,10 @@ class SparkHadoopUtilSuite extends SparkFunSuite { val hadoopConf = new Configuration(false) sc.set("spark.hadoop.orc.filterPushdown", "true") new SparkHadoopUtil().appendSparkHadoopConfigs(sc, hadoopConf) - assertConfigValue(hadoopConf, "orc.filterPushdown", "true" ) - assertConfigValue(hadoopConf, "fs.s3a.downgrade.syncable.exceptions", "true") - assertConfigValue(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com") + assertConfigMatches(hadoopConf, "orc.filterPushdown", "true", SOURCE_SPARK_HADOOP) + assertConfigMatches(hadoopConf, "fs.s3a.downgrade.syncable.exceptions", "true", + SET_TO_DEFAULT_VALUES) + assertConfigMatches(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com", SET_TO_DEFAULT_VALUES) } /** @@ -46,7 +51,7 @@ class SparkHadoopUtilSuite extends SparkFunSuite { val hadoopConf = new Configuration(false) sc.set("spark.hadoop.fs.s3a.endpoint", "") new SparkHadoopUtil().appendSparkHadoopConfigs(sc, hadoopConf) - assertConfigValue(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com") + assertConfigMatches(hadoopConf, "fs.s3a.endpoint", "s3.amazonaws.com", SET_TO_DEFAULT_VALUES) } /** @@ -67,7 +72,7 @@ class SparkHadoopUtilSuite extends SparkFunSuite { * If the endpoint region is set (even to a blank string) in * "spark.hadoop.fs.s3a.endpoint.region" then the endpoint is not set, * even when the s3a endpoint is "". - * This supports a feature in later hadoop versions where this configuration + * This supports a feature in hadoop 3.3.1 where this configuration * pair triggers a revert to the "SDK to work out the region" algorithm, * which works on EC2 deployments. */ @@ -80,6 +85,44 @@ class SparkHadoopUtilSuite extends SparkFunSuite { assertConfigValue(hadoopConf, "fs.s3a.endpoint", null) } + /** + * spark.hive.* is passed to the hadoop config as hive.*. + */ + test("SPARK-40640: spark.hive propagation") { + val sc = new SparkConf() + val hadoopConf = new Configuration(false) + sc.set("spark.hive.hiveoption", "value") + new SparkHadoopUtil().appendS3AndSparkHadoopHiveConfigurations(sc, hadoopConf) + assertConfigMatches(hadoopConf, "hive.hiveoption", "value", SOURCE_SPARK_HIVE) + } + + /** + * The explicit buffer size propagation records this. + */ + test("SPARK-40640: buffer size propagation") { + val sc = new SparkConf() + val hadoopConf = new Configuration(false) + sc.set(BUFFER_SIZE.key, "123") + new SparkHadoopUtil().appendS3AndSparkHadoopHiveConfigurations(sc, hadoopConf) + assertConfigMatches(hadoopConf, "io.file.buffer.size", "123", BUFFER_SIZE.key) + } + + test("SPARK-40640: aws credentials from environment variables") { + val hadoopConf = new Configuration(false) + SparkHadoopUtil.appendS3CredentialsFromEnvironment(hadoopConf, + "access-key", "secret-key", "session-token") + val source = "Set by Spark on " + InetAddress.getLocalHost + " from " + assertConfigMatches(hadoopConf, "fs.s3a.access.key", "access-key", source) + assertConfigMatches(hadoopConf, "fs.s3a.secret.key", "secret-key", source) + assertConfigMatches(hadoopConf, "fs.s3a.session.token", "session-token", source) + } + + test("SPARK-19739: S3 session token propagation requires access and secret keys") { + val hadoopConf = new Configuration(false) + SparkHadoopUtil.appendS3CredentialsFromEnvironment(hadoopConf, null, null, "session-token") + assertConfigValue(hadoopConf, "fs.s3a.session.token", null) + } + /** * Assert that a hadoop configuration option has the expected value. * @param hadoopConf configuration to query @@ -87,10 +130,46 @@ class SparkHadoopUtilSuite extends SparkFunSuite { * @param expected expected value. */ private def assertConfigValue( - hadoopConf: Configuration, - key: String, - expected: String): Unit = { + hadoopConf: Configuration, + key: String, + expected: String): Unit = { assert(hadoopConf.get(key) === expected, s"Mismatch in expected value of $key") } + + /** + * Assert that a hadoop configuration option has the expected value + * and has the expected source. + * + * @param hadoopConf configuration to query + * @param key key to look up + * @param expected expected value. + * @param expectedSource string required to be in the property source string + */ + private def assertConfigMatches( + hadoopConf: Configuration, + key: String, + expected: String, + expectedSource: String): Unit = { + assertConfigValue(hadoopConf, key, expected) + assertConfigSourceContains(hadoopConf, key, expectedSource) + } + + /** + * Assert that a source of a configuration matches a specific string. + * @param hadoopConf hadoop configuration + * @param key key to probe + * @param expectedSource expected source + */ + private def assertConfigSourceContains( + hadoopConf: Configuration, + key: String, + expectedSource: String): Unit = { + val v = hadoopConf.get(key) + // get the source list + val origin = SparkHadoopUtil.propertySources(hadoopConf, key) + assert(origin.nonEmpty, s"Sources are missing for '$key' with value '$v'") + assert(origin.contains(expectedSource), + s"Expected source $key with value $v: and source $origin to contain $expectedSource") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index db600bcd3d4..f6dc35131b7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.TableIdentifier @@ -1303,7 +1304,7 @@ private[hive] object HiveClientImpl extends Logging { // 3: we set all entries in config to this hiveConf. val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++ sparkConf.getAll.toMap ++ extraConfig).toMap - confMap.foreach { case (k, v) => hiveConf.set(k, v) } + confMap.foreach { case (k, v) => hiveConf.set(k, v, SOURCE_SPARK) } SQLConf.get.redactOptions(confMap).foreach { case (k, v) => logDebug(s"Applying Hadoop/Hive/Spark and extra properties to Hive Conf:$k=$v") } @@ -1321,7 +1322,7 @@ private[hive] object HiveClientImpl extends Logging { if (hiveConf.get("hive.execution.engine") == "tez") { logWarning("Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr'" + " to disable useless hive logic") - hiveConf.set("hive.execution.engine", "mr") + hiveConf.set("hive.execution.engine", "mr", SOURCE_SPARK) } hiveConf } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org