This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new b6ed6ad STORM-3066: Implement support for using list elements in properties new dfb9b55 Merge pull request #3155 from efgpinto/STORM-3066 b6ed6ad is described below commit b6ed6ade4f75863b2607d4478898449c469bb662 Author: Eduardo Pinto <eduardo.pi...@blip.pt> AuthorDate: Sun Oct 27 21:16:55 2019 +0000 STORM-3066: Implement support for using list elements in properties --- .../org/apache/storm/flux/parser/FluxParser.java | 103 ++++++++++++--------- .../test/java/org/apache/storm/flux/TCKTest.java | 5 + .../test/resources/configs/substitution-test.yaml | 2 + 3 files changed, 68 insertions(+), 42 deletions(-) diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index 8299c14..50570e1 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -18,12 +18,17 @@ package org.apache.storm.flux.parser; -import java.io.ByteArrayOutputStream; +import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.storm.flux.model.BoltDef; import org.apache.storm.flux.model.IncludeDef; @@ -40,17 +45,20 @@ import org.yaml.snakeyaml.constructor.Constructor; */ public class FluxParser { private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class); + private static final Pattern propertyPattern = + Pattern.compile(".*\\$\\{(?<var>ENV-(?<envVar>.+)|(?<list>.+)\\[(?<listIndex>\\d+)]|.+)}.*"); private FluxParser() { } /** * Parse a flux topology definition. - * @param inputFile source YAML file - * @param dumpYaml if true, dump the parsed YAML to stdout + * + * @param inputFile source YAML file + * @param dumpYaml if true, dump the parsed YAML to stdout * @param processIncludes whether or not to process includes - * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param properties properties file for variable substitution + * @param envSub whether or not to perform environment variable substitution * @return resulting topologuy definition * @throws IOException if there is a problem reading file(s) */ @@ -65,11 +73,12 @@ public class FluxParser { /** * Parse a flux topology definition from a classpath resource.. - * @param resource YAML resource - * @param dumpYaml if true, dump the parsed YAML to stdout + * + * @param resource YAML resource + * @param dumpYaml if true, dump the parsed YAML to stdout * @param processIncludes whether or not to process includes - * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param properties properties file for variable substitution + * @param envSub whether or not to perform environment variable substitution * @return resulting topologuy definition * @throws IOException if there is a problem reading file(s) */ @@ -84,11 +93,12 @@ public class FluxParser { /** * Parse a flux topology definition. - * @param inputStream InputStream representation of YAML file - * @param dumpYaml if true, dump the parsed YAML to stdout + * + * @param inputStream InputStream representation of YAML file + * @param dumpYaml if true, dump the parsed YAML to stdout * @param processIncludes whether or not to process includes - * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param properties properties file for variable substitution + * @param envSub whether or not to perform environment variable substitution * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ @@ -116,10 +126,11 @@ public class FluxParser { /** * Parse filter properties file. + * * @param propertiesFile properties file for variable substitution - * @param resource whether or not to load properties file from classpath + * @param resource whether or not to load properties file from classpath * @return resulting filter properties - * @throws IOException if there is a problem reading file + * @throws IOException if there is a problem reading file */ public static Properties parseProperties(String propertiesFile, boolean resource) throws IOException { Properties properties = null; @@ -140,36 +151,43 @@ public class FluxParser { } private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties properties, boolean envSubstitution) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); LOG.info("loading YAML from input stream..."); - int b = -1; - while ((b = in.read()) != -1) { - bos.write(b); - } + try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + String conf = reader.lines().map(line -> { + Matcher m = propertyPattern.matcher(line); + return m.find() + ? getPropertyReplacement(properties, m, envSubstitution) + .map(propValue -> line.replace("${" + m.group("var") + "}", propValue)) + .orElseGet(() -> { + LOG.warn("Could not find replacement for property: " + m.group("var")); + return line; + }) + : line; + }).collect(Collectors.joining(System.lineSeparator())); - // TODO substitution implementation is not exactly efficient or kind to memory... - String str = bos.toString(); - // properties file substitution - if (properties != null) { - LOG.info("Performing property substitution."); - for (Object key : properties.keySet()) { - str = str.replace("${" + key + "}", properties.getProperty((String)key)); - } - } else { - LOG.info("Not performing property substitution."); + return (TopologyDef) yaml.load(conf); } + } - // environment variable substitution - if (envSubstitution) { - LOG.info("Performing environment variable substitution..."); - Map<String, String> envs = System.getenv(); - for (String key : envs.keySet()) { - str = str.replace("${ENV-" + key + "}", envs.get(key)); - } + private static Optional<String> getPropertyReplacement(Properties properties, Matcher match, boolean envSubstitution) { + if (match.group("listIndex") != null) { + String prop = properties.getProperty(match.group("list")); + return Optional.of(parseListAndExtractElem(prop, match.group("listIndex"))); + } else if (envSubstitution && match.group("envVar") != null) { + String envVar = System.getenv().get(match.group("envVar")); + return Optional.ofNullable(envVar); } else { - LOG.info("Not performing environment variable substitution."); + return Optional.ofNullable(properties.getProperty(match.group("var"))); } - return (TopologyDef) yaml.load(str); + } + + private static String parseListAndExtractElem(String strList, String index) { + String[] listProp = strList.substring(1, strList.length() - 1).split(","); + String listElem = listProp[Integer.parseInt(index)]; + + // remove whitespaces and double quotes from beginning and end of a given string + String trimmed = listElem.trim(); + return trimmed.substring(1, trimmed.length() - 1); } private static void dumpYaml(TopologyDef topology, Yaml yaml) { @@ -191,14 +209,15 @@ public class FluxParser { /** * Process includes contained within a yaml file. + * * @param yaml the yaml parser for parsing the include file(s) * @param topologyDef the topology definition containing (possibly zero) includes - * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param properties properties file for variable substitution + * @param envSub whether or not to perform environment variable substitution * @return The TopologyDef with includes resolved. */ private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub) - throws IOException { + throws IOException { //TODO support multiple levels of includes if (topologyDef.getIncludes() != null) { for (IncludeDef include : topologyDef.getIncludes()) { diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index 90613c9..275a720 100644 --- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java +++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@ -275,6 +275,11 @@ public class TCKTest { Collections.singletonList("A string list"), is(context.getTopologyDef().getConfig().get("list.property.target"))); + //Test substitution where the target type is a List element + assertThat("List element property is not replaced by the expected value", + "A string list", + is(context.getTopologyDef().getConfig().get("list.element.property.target"))); + } @Test diff --git a/flux/flux-core/src/test/resources/configs/substitution-test.yaml b/flux/flux-core/src/test/resources/configs/substitution-test.yaml index 9707936..67ac92a 100644 --- a/flux/flux-core/src/test/resources/configs/substitution-test.yaml +++ b/flux/flux-core/src/test/resources/configs/substitution-test.yaml @@ -45,6 +45,8 @@ config: test.env.value: "${ENV-PATH}" # test variable substitution for list type list.property.target: ${a.list.property} + # test variable substitution for list element + list.element.property.target: ${a.list.property[0]} # spout definitions spouts: