[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...

2016-07-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14022


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...

2016-07-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14022#discussion_r71197993
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala ---
@@ -99,13 +133,66 @@ private class FallbackConfigEntry[T] (
 key: String,
 doc: String,
 isPublic: Boolean,
-private val fallback: ConfigEntry[T])
+private[config] val fallback: ConfigEntry[T])
 extends ConfigEntry[T](key, fallback.valueConverter, 
fallback.stringConverter, doc, isPublic) {
 
   override def defaultValueString: String = s""
 
-  override def readFrom(conf: SparkConf): T = {
-
conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
+  override def readFrom(conf: JMap[String, String], getenv: String => 
String): T = {
+
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, 
getenv))
+  }
+
+}
+
+private object ConfigEntry {
+
+  private val knownConfigs = new 
java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
+
+  private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
+
+  def registerEntry(entry: ConfigEntry[_]): Unit = {
+val existing = knownConfigs.putIfAbsent(entry.key, entry)
+require(existing == null, s"Config entry ${entry.key} already 
registered!")
+  }
+
+  def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
+
+  /**
+   * Expand the `value` according to the rules explained in ConfigEntry.
+   */
+  def expand(
+  value: String,
+  conf: JMap[String, String],
+  getenv: String => String,
+  usedRefs: Set[String]): String = {
+REF_RE.replaceAllIn(value, { m =>
+  val prefix = m.group(1)
+  val name = m.group(2)
+  val replacement = prefix match {
+case null =>
+  require(!usedRefs.contains(name), s"Circular reference in 
$value: $name")
+  if (name.startsWith("spark.")) {
+Option(findEntry(name))
+  .flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + 
name))
+  .orElse(Option(conf.get(name)))
+  .orElse(defaultValueString(name))
+  } else {
+None
+  }
+case "system" => sys.props.get(name)
+case "env" => Option(getenv(name))
+case _ => throw new IllegalArgumentException(s"Invalid prefix: 
$prefix")
--- End diff --

Yeah, I have to take a look at this. Throwing might actually break some 
existing code in SparkHadoopUtil.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...

2016-07-18 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14022#discussion_r71197308
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala ---
@@ -99,13 +133,66 @@ private class FallbackConfigEntry[T] (
 key: String,
 doc: String,
 isPublic: Boolean,
-private val fallback: ConfigEntry[T])
+private[config] val fallback: ConfigEntry[T])
 extends ConfigEntry[T](key, fallback.valueConverter, 
fallback.stringConverter, doc, isPublic) {
 
   override def defaultValueString: String = s""
 
-  override def readFrom(conf: SparkConf): T = {
-
conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
+  override def readFrom(conf: JMap[String, String], getenv: String => 
String): T = {
+
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, 
getenv))
+  }
+
+}
+
+private object ConfigEntry {
+
+  private val knownConfigs = new 
java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
+
+  private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
+
+  def registerEntry(entry: ConfigEntry[_]): Unit = {
+val existing = knownConfigs.putIfAbsent(entry.key, entry)
+require(existing == null, s"Config entry ${entry.key} already 
registered!")
+  }
+
+  def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
+
+  /**
+   * Expand the `value` according to the rules explained in ConfigEntry.
+   */
+  def expand(
+  value: String,
+  conf: JMap[String, String],
+  getenv: String => String,
+  usedRefs: Set[String]): String = {
+REF_RE.replaceAllIn(value, { m =>
+  val prefix = m.group(1)
+  val name = m.group(2)
+  val replacement = prefix match {
+case null =>
+  require(!usedRefs.contains(name), s"Circular reference in 
$value: $name")
+  if (name.startsWith("spark.")) {
+Option(findEntry(name))
+  .flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + 
name))
+  .orElse(Option(conf.get(name)))
+  .orElse(defaultValueString(name))
+  } else {
+None
+  }
+case "system" => sys.props.get(name)
+case "env" => Option(getenv(name))
+case _ => throw new IllegalArgumentException(s"Invalid prefix: 
$prefix")
--- End diff --

Should this throw?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...

2016-07-13 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14022#discussion_r70728894
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala ---
@@ -99,13 +118,83 @@ private class FallbackConfigEntry[T] (
 key: String,
 doc: String,
 isPublic: Boolean,
-private val fallback: ConfigEntry[T])
-extends ConfigEntry[T](key, fallback.valueConverter, 
fallback.stringConverter, doc, isPublic) {
+private[config] val fallback: ConfigEntry[T],
+private val expandVars: Boolean)
+extends ConfigEntry[T](
+key,
+fallback.valueConverter,
+fallback.stringConverter,
+doc,
+isPublic,
+expandVars) {
 
   override def defaultValueString: String = s""
 
-  override def readFrom(conf: SparkConf): T = {
-
conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
+  override def readFrom(conf: JMap[String, String], getenv: String => 
String): T = {
+
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, 
getenv))
+  }
+
+}
+
+private object ConfigEntry {
+
+  private val knownConfigs = new 
java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
+
+  private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r.pattern
+
+  def registerEntry(entry: ConfigEntry[_]): Unit = {
+val existing = knownConfigs.putIfAbsent(entry.key, entry)
+require(existing == null, s"Config entry ${entry.key} already 
registered!")
+  }
+
+  def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
+
+  /**
+   * Expand the `value` according to the rules explained in 
`ConfigBuilder.withVariableExpansion`.
+   */
+  def expand(
+  value: String,
+  conf: JMap[String, String],
+  getenv: String => String,
+  usedRefs: Set[String]): String = {
+val matcher = REF_RE.matcher(value)
+val result = new StringBuilder()
+var end = 0
+
+while (end < value.length() && matcher.find(end)) {
--- End diff --

That looks interesting, let me take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...

2016-07-13 Thread ericl
Github user ericl commented on a diff in the pull request:

https://github.com/apache/spark/pull/14022#discussion_r70728529
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala ---
@@ -99,13 +118,83 @@ private class FallbackConfigEntry[T] (
 key: String,
 doc: String,
 isPublic: Boolean,
-private val fallback: ConfigEntry[T])
-extends ConfigEntry[T](key, fallback.valueConverter, 
fallback.stringConverter, doc, isPublic) {
+private[config] val fallback: ConfigEntry[T],
+private val expandVars: Boolean)
+extends ConfigEntry[T](
+key,
+fallback.valueConverter,
+fallback.stringConverter,
+doc,
+isPublic,
+expandVars) {
 
   override def defaultValueString: String = s""
 
-  override def readFrom(conf: SparkConf): T = {
-
conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
+  override def readFrom(conf: JMap[String, String], getenv: String => 
String): T = {
+
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, 
getenv))
+  }
+
+}
+
+private object ConfigEntry {
+
+  private val knownConfigs = new 
java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
+
+  private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r.pattern
+
+  def registerEntry(entry: ConfigEntry[_]): Unit = {
+val existing = knownConfigs.putIfAbsent(entry.key, entry)
+require(existing == null, s"Config entry ${entry.key} already 
registered!")
+  }
+
+  def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
+
+  /**
+   * Expand the `value` according to the rules explained in 
`ConfigBuilder.withVariableExpansion`.
+   */
+  def expand(
+  value: String,
+  conf: JMap[String, String],
+  getenv: String => String,
+  usedRefs: Set[String]): String = {
+val matcher = REF_RE.matcher(value)
+val result = new StringBuilder()
+var end = 0
+
+while (end < value.length() && matcher.find(end)) {
--- End diff --

I think you can use 
[Regex#replaceAllIn](http://daily-scala.blogspot.com/2010/02/regex-replaceallin.html)
 here instead of a loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14022: [SPARK-16272][core] Allow config values to refere...

2016-07-01 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/14022

[SPARK-16272][core] Allow config values to reference conf, env, system 
props.

This allows configuration to be more flexible when the cluster does not
have a homogeneous configuration (e.g. packages are installed on different
paths in different nodes). By allowing one to reference the environment from
the conf, it becomes possible to work around those in certain cases.

The feature is hooked up to spark.sql.hive.metastore.jars to show how to use
it, and because it's an example of said scenario that I ran into. It uses a
new "pathConf" config type that is a shorthand for enabling variable 
expansion
on string configs.

As part of the implementation, ConfigEntry now keeps track of all "known" 
configs
(i.e. those created through the use of ConfigBuilder), since that list is 
used
by the resolution code. This duplicates some code in SQLConf, which could 
potentially
be merged with this now. It will also make it simpler to implement some 
missing
features such as filtering which configs show up in the UI or in event logs 
- which
are not part of this change.

Another change is in the way ConfigEntry reads config data; it now takes a 
string
map and a function that reads env variables, so that it can be called both 
from
SparkConf and SQLConf. This makes it so both places follow the same read 
path,
instead of having to replicate certain logic in SQLConf. There are still a
couple of methods in SQLConf that peek into fields of ConfigEntry directly,
though.

Tested via unit tests, and by using the new variable expansion functionality
in a shell session with a custom spark.sql.hive.metastore.jars value.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-16272

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14022


commit f4e772ad2b5c8231c48b2cd1f8f75c81e754c2b4
Author: Marcelo Vanzin 
Date:   2016-06-28T22:39:49Z

[SPARK-16272][core] Allow config values to reference conf, env, system 
props.

This allows configuration to be more flexible when the cluster does not
have a homogeneous configuration (e.g. packages are installed on different
paths in different nodes). By allowing one to reference the environment from
the conf, it becomes possible to work around those in certain cases.

The feature is hooked up to spark.sql.hive.metastore.jars to show how to use
it, and because it's an example of said scenario that I ran into. It uses a
new "pathConf" config type that is a shorthand for enabling variable 
expansion
on string configs.

As part of the implementation, ConfigEntry now keeps track of all "known" 
configs
(i.e. those created through the use of ConfigBuilder), since that list is 
used
by the resolution code. This duplicates some code in SQLConf, which could 
potentially
be merged with this now. It will also make it simpler to implement some 
missing
features such as filtering which configs show up in the UI or in event logs 
- which
are not part of this change.

Another change is in the way ConfigEntry reads config data; it now takes a 
string
map and a function that reads env variables, so that it can be called both 
from
SparkConf and SQLConf. This makes it so both places follow the same read 
path,
instead of having to replicate certain logic in SQLConf. There are still a
couple of methods in SQLConf that peek into fields of ConfigEntry directly,
though.

Tested via unit tests, and by using the new variable expansion functionality
in a shell session with a custom spark.sql.hive.metastore.jars value.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org