[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14022 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14022#discussion_r71197993 --- Diff: core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala --- @@ -99,13 +133,66 @@ private class FallbackConfigEntry[T] ( key: String, doc: String, isPublic: Boolean, -private val fallback: ConfigEntry[T]) +private[config] val fallback: ConfigEntry[T]) extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) { override def defaultValueString: String = s"" - override def readFrom(conf: SparkConf): T = { - conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf)) + override def readFrom(conf: JMap[String, String], getenv: String => String): T = { + Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv)) + } + +} + +private object ConfigEntry { + + private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]() + + private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r + + def registerEntry(entry: ConfigEntry[_]): Unit = { +val existing = knownConfigs.putIfAbsent(entry.key, entry) +require(existing == null, s"Config entry ${entry.key} already registered!") + } + + def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key) + + /** + * Expand the `value` according to the rules explained in ConfigEntry. + */ + def expand( + value: String, + conf: JMap[String, String], + getenv: String => String, + usedRefs: Set[String]): String = { +REF_RE.replaceAllIn(value, { m => + val prefix = m.group(1) + val name = m.group(2) + val replacement = prefix match { +case null => + require(!usedRefs.contains(name), s"Circular reference in $value: $name") + if (name.startsWith("spark.")) { +Option(findEntry(name)) + .flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name)) + .orElse(Option(conf.get(name))) + .orElse(defaultValueString(name)) + } else { +None + } +case "system" => sys.props.get(name) +case "env" => Option(getenv(name)) +case _ => throw new IllegalArgumentException(s"Invalid prefix: $prefix") --- End diff -- Yeah, I have to take a look at this. Throwing might actually break some existing code in SparkHadoopUtil. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14022#discussion_r71197308 --- Diff: core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala --- @@ -99,13 +133,66 @@ private class FallbackConfigEntry[T] ( key: String, doc: String, isPublic: Boolean, -private val fallback: ConfigEntry[T]) +private[config] val fallback: ConfigEntry[T]) extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) { override def defaultValueString: String = s"" - override def readFrom(conf: SparkConf): T = { - conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf)) + override def readFrom(conf: JMap[String, String], getenv: String => String): T = { + Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv)) + } + +} + +private object ConfigEntry { + + private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]() + + private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r + + def registerEntry(entry: ConfigEntry[_]): Unit = { +val existing = knownConfigs.putIfAbsent(entry.key, entry) +require(existing == null, s"Config entry ${entry.key} already registered!") + } + + def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key) + + /** + * Expand the `value` according to the rules explained in ConfigEntry. + */ + def expand( + value: String, + conf: JMap[String, String], + getenv: String => String, + usedRefs: Set[String]): String = { +REF_RE.replaceAllIn(value, { m => + val prefix = m.group(1) + val name = m.group(2) + val replacement = prefix match { +case null => + require(!usedRefs.contains(name), s"Circular reference in $value: $name") + if (name.startsWith("spark.")) { +Option(findEntry(name)) + .flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name)) + .orElse(Option(conf.get(name))) + .orElse(defaultValueString(name)) + } else { +None + } +case "system" => sys.props.get(name) +case "env" => Option(getenv(name)) +case _ => throw new IllegalArgumentException(s"Invalid prefix: $prefix") --- End diff -- Should this throw? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14022#discussion_r70728894 --- Diff: core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala --- @@ -99,13 +118,83 @@ private class FallbackConfigEntry[T] ( key: String, doc: String, isPublic: Boolean, -private val fallback: ConfigEntry[T]) -extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) { +private[config] val fallback: ConfigEntry[T], +private val expandVars: Boolean) +extends ConfigEntry[T]( +key, +fallback.valueConverter, +fallback.stringConverter, +doc, +isPublic, +expandVars) { override def defaultValueString: String = s"" - override def readFrom(conf: SparkConf): T = { - conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf)) + override def readFrom(conf: JMap[String, String], getenv: String => String): T = { + Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv)) + } + +} + +private object ConfigEntry { + + private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]() + + private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r.pattern + + def registerEntry(entry: ConfigEntry[_]): Unit = { +val existing = knownConfigs.putIfAbsent(entry.key, entry) +require(existing == null, s"Config entry ${entry.key} already registered!") + } + + def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key) + + /** + * Expand the `value` according to the rules explained in `ConfigBuilder.withVariableExpansion`. + */ + def expand( + value: String, + conf: JMap[String, String], + getenv: String => String, + usedRefs: Set[String]): String = { +val matcher = REF_RE.matcher(value) +val result = new StringBuilder() +var end = 0 + +while (end < value.length() && matcher.find(end)) { --- End diff -- That looks interesting, let me take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/14022#discussion_r70728529 --- Diff: core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala --- @@ -99,13 +118,83 @@ private class FallbackConfigEntry[T] ( key: String, doc: String, isPublic: Boolean, -private val fallback: ConfigEntry[T]) -extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) { +private[config] val fallback: ConfigEntry[T], +private val expandVars: Boolean) +extends ConfigEntry[T]( +key, +fallback.valueConverter, +fallback.stringConverter, +doc, +isPublic, +expandVars) { override def defaultValueString: String = s"" - override def readFrom(conf: SparkConf): T = { - conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf)) + override def readFrom(conf: JMap[String, String], getenv: String => String): T = { + Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv)) + } + +} + +private object ConfigEntry { + + private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]() + + private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r.pattern + + def registerEntry(entry: ConfigEntry[_]): Unit = { +val existing = knownConfigs.putIfAbsent(entry.key, entry) +require(existing == null, s"Config entry ${entry.key} already registered!") + } + + def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key) + + /** + * Expand the `value` according to the rules explained in `ConfigBuilder.withVariableExpansion`. + */ + def expand( + value: String, + conf: JMap[String, String], + getenv: String => String, + usedRefs: Set[String]): String = { +val matcher = REF_RE.matcher(value) +val result = new StringBuilder() +var end = 0 + +while (end < value.length() && matcher.find(end)) { --- End diff -- I think you can use [Regex#replaceAllIn](http://daily-scala.blogspot.com/2010/02/regex-replaceallin.html) here instead of a loop. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/14022 [SPARK-16272][core] Allow config values to reference conf, env, system props. This allows configuration to be more flexible when the cluster does not have a homogeneous configuration (e.g. packages are installed on different paths in different nodes). By allowing one to reference the environment from the conf, it becomes possible to work around those in certain cases. The feature is hooked up to spark.sql.hive.metastore.jars to show how to use it, and because it's an example of said scenario that I ran into. It uses a new "pathConf" config type that is a shorthand for enabling variable expansion on string configs. As part of the implementation, ConfigEntry now keeps track of all "known" configs (i.e. those created through the use of ConfigBuilder), since that list is used by the resolution code. This duplicates some code in SQLConf, which could potentially be merged with this now. It will also make it simpler to implement some missing features such as filtering which configs show up in the UI or in event logs - which are not part of this change. Another change is in the way ConfigEntry reads config data; it now takes a string map and a function that reads env variables, so that it can be called both from SparkConf and SQLConf. This makes it so both places follow the same read path, instead of having to replicate certain logic in SQLConf. There are still a couple of methods in SQLConf that peek into fields of ConfigEntry directly, though. Tested via unit tests, and by using the new variable expansion functionality in a shell session with a custom spark.sql.hive.metastore.jars value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-16272 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14022.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14022 commit f4e772ad2b5c8231c48b2cd1f8f75c81e754c2b4 Author: Marcelo VanzinDate: 2016-06-28T22:39:49Z [SPARK-16272][core] Allow config values to reference conf, env, system props. This allows configuration to be more flexible when the cluster does not have a homogeneous configuration (e.g. packages are installed on different paths in different nodes). By allowing one to reference the environment from the conf, it becomes possible to work around those in certain cases. The feature is hooked up to spark.sql.hive.metastore.jars to show how to use it, and because it's an example of said scenario that I ran into. It uses a new "pathConf" config type that is a shorthand for enabling variable expansion on string configs. As part of the implementation, ConfigEntry now keeps track of all "known" configs (i.e. those created through the use of ConfigBuilder), since that list is used by the resolution code. This duplicates some code in SQLConf, which could potentially be merged with this now. It will also make it simpler to implement some missing features such as filtering which configs show up in the UI or in event logs - which are not part of this change. Another change is in the way ConfigEntry reads config data; it now takes a string map and a function that reads env variables, so that it can be called both from SparkConf and SQLConf. This makes it so both places follow the same read path, instead of having to replicate certain logic in SQLConf. There are still a couple of methods in SQLConf that peek into fields of ConfigEntry directly, though. Tested via unit tests, and by using the new variable expansion functionality in a shell session with a custom spark.sql.hive.metastore.jars value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org