STORM-2153: Add warnings for component names containing '.' to DefaultTopologyValidator; Add StrictTopologyValidator
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5ce45b72 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5ce45b72 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5ce45b72 Branch: refs/heads/1.x-branch Commit: 5ce45b72715da4478fddabdfde12becec1373a93 Parents: cd272c4 Author: P. Taylor Goetz <[email protected]> Authored: Thu Dec 14 14:06:20 2017 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Thu Dec 14 14:06:20 2017 -0500 ---------------------------------------------------------------------- .../storm/nimbus/DefaultTopologyValidator.java | 36 ++++++++++- .../storm/nimbus/StrictTopologyValidator.java | 65 ++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5ce45b72/storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java b/storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java index 0626cb6..fc0dfac 100644 --- a/storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java +++ b/storm-core/src/jvm/org/apache/storm/nimbus/DefaultTopologyValidator.java @@ -17,15 +17,49 @@ */ package org.apache.storm.nimbus; +import org.apache.storm.generated.Bolt; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; public class DefaultTopologyValidator implements ITopologyValidator { + private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyValidator.class); @Override public void prepare(Map StormConf){ } @Override - public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + if(topologyName.contains(".")){ + LOG.warn("Metrics for topology name '{}' will be reported as '{}'.", topologyName, topologyName.replace('.', '_') ); + } + Map<String, SpoutSpec> spouts = topology.get_spouts(); + for(String spoutName : spouts.keySet()){ + if(spoutName.contains(".")){ + LOG.warn("Metrics for spout name '{}' will be reported as '{}'.", spoutName, spoutName.replace('.', '_') ); + } + SpoutSpec spoutSpec = spouts.get(spoutName); + for(String streamName : spoutSpec.get_common().get_streams().keySet()){ + if(streamName.contains(".")){ + LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); + } + } + } + + Map<String, Bolt> bolts = topology.get_bolts(); + for(String boltName : bolts.keySet()){ + if(boltName.contains(".")){ + LOG.warn("Metrics for bolt name '{}' will be reported as '{}'.", boltName, boltName.replace('.', '_') ); + } + Bolt bolt = bolts.get(boltName); + for(String streamName : bolt.get_common().get_streams().keySet()){ + if(streamName.contains(".")){ + LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_') ); + } + } + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/5ce45b72/storm-core/src/jvm/org/apache/storm/nimbus/StrictTopologyValidator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/nimbus/StrictTopologyValidator.java b/storm-core/src/jvm/org/apache/storm/nimbus/StrictTopologyValidator.java new file mode 100644 index 0000000..e081a4d --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/nimbus/StrictTopologyValidator.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.nimbus; + +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class StrictTopologyValidator implements ITopologyValidator { + private static final Logger LOG = LoggerFactory.getLogger(StrictTopologyValidator.class); + @Override + public void prepare(Map StormConf){ + } + @Override + public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + if(topologyName.contains(".")){ + throw new InvalidTopologyException(String.format("Topology name '%s' contains illegal character '.'", topologyName)); + } + Map<String, SpoutSpec> spouts = topology.get_spouts(); + for(String spoutName : spouts.keySet()){ + if(spoutName.contains(".")){ + throw new InvalidTopologyException(String.format("Spout name '%s' contains illegal character '.'", spoutName)); + } + SpoutSpec spoutSpec = spouts.get(spoutName); + for(String streamName : spoutSpec.get_common().get_streams().keySet()){ + if(streamName.contains(".")){ + throw new InvalidTopologyException(String.format("Stream name '%s' contains illegal character '.'", streamName)); + } + } + } + + Map<String, Bolt> bolts = topology.get_bolts(); + for(String boltName : bolts.keySet()){ + if(boltName.contains(".")){ + throw new InvalidTopologyException(String.format("Bolt name '%s' contains illegal character '.'", boltName)); + } + Bolt bolt = bolts.get(boltName); + for(String streamName : bolt.get_common().get_streams().keySet()){ + if(streamName.contains(".")){ + throw new InvalidTopologyException(String.format("Stream name '%s' contains illegal character '.'", streamName)); + } + } + } + } +}
