[
https://issues.apache.org/jira/browse/STORM-1269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179604#comment-15179604
]
ASF GitHub Bot commented on STORM-1269:
---------------------------------------
Github user abhishekagarwal87 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1185#discussion_r55006975
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.Thrift;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.generated.*;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.metric.EventLoggerBolt;
+import org.apache.storm.metric.MetricsConsumerBolt;
+import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.testing.NonRichBoltTracker;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.IPredicate;
+import org.apache.storm.utils.ThriftTopologyUtils;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class StormCommon {
+ // A singleton instance allows us to mock delegated static methods in
our
+ // tests by subclassing.
+ private static StormCommon _instance = new StormCommon();
+
+ /**
+ * Provide an instance of this class for delegates to use. To mock out
+ * delegated methods, provide an instance of a subclass that overrides
the
+ * implementation of the delegated method.
+ * @param common a StormCommon instance
+ * @return the previously set instance
+ */
+ public static StormCommon setInstance(StormCommon common) {
+ StormCommon oldInstance = _instance;
+ _instance = common;
+ return oldInstance;
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StormCommon.class);
+
+ public static final String ACKER_COMPONENT_ID =
Acker.ACKER_COMPONENT_ID;
+ public static final String ACKER_INIT_STREAM_ID =
Acker.ACKER_INIT_STREAM_ID;
+ public static final String ACKER_ACK_STREAM_ID =
Acker.ACKER_ACK_STREAM_ID;
+ public static final String ACKER_FAIL_STREAM_ID =
Acker.ACKER_FAIL_STREAM_ID;
+
+ public static final String SYSTEM_STREAM_ID = "__system";
+
+ public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
+ public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
+
+ public static void startMetricsReporter(PreparableReporter report, Map
conf) {
+ report.prepare(new MetricRegistry(), conf);
+ report.start();
+ LOG.info("Started statistics report plugin...");
+ }
+
+ public static void startMetricsReporters(Map conf) {
+ List<PreparableReporter> reporters =
MetricsUtils.getPreparableReporters(conf);
+ for (PreparableReporter reporter : reporters) {
+ startMetricsReporter(reporter, conf);
+ }
+ }
+
+ public static String getTopologyNameById(String topologyId) {
+ String topologyName = null;
+ try {
+ topologyName = topologyIdToName(topologyId);
+ } catch (InvalidTopologyException e) {
+ LOG.error("Invalid topologyId=" + topologyId);
+ }
+ return topologyName;
+ }
+
+ /**
+ * Convert topologyId to topologyName. TopologyId =
topoloygName-counter-timeStamp
+ *
+ * @param topologyId
+ * @return
+ */
+ public static String topologyIdToName(String topologyId) throws
InvalidTopologyException {
+ String ret = null;
+ int index = topologyId.lastIndexOf('-');
+ if (index != -1 && index > 2) {
+ index = topologyId.lastIndexOf('-', index - 1);
+ if (index != -1 && index > 0)
+ ret = topologyId.substring(0, index);
+ else
+ throw new InvalidTopologyException(topologyId + " is not a
valid topologyId");
+ } else
+ throw new InvalidTopologyException(topologyId + " is not a
valid topologyId");
+ return ret;
+ }
+
+ public static String getStormId(IStormClusterState stormClusterState,
final String topologyName) {
+ List<String> activeTopologys = stormClusterState.activeStorms();
+ IPredicate pred = new IPredicate<String>() {
+ @Override
+ public boolean test(String obj) {
+ return obj != null ?
getTopologyNameById(obj).equals(topologyName) : false;
+ }
+ };
+ return Utils.findOne(pred, activeTopologys);
+ }
+
+ public static Map<String, StormBase> topologyBases(IStormClusterState
stormClusterState) {
+ return _instance.topologyBasesImpl(stormClusterState);
+ }
+
+ protected Map<String, StormBase> topologyBasesImpl(IStormClusterState
stormClusterState) {
+ List<String> activeTopologys = stormClusterState.activeStorms();
+ Map<String, StormBase> stormBases = new HashMap<String,
StormBase>();
+ if (activeTopologys != null) {
+ for (String topologyId : activeTopologys) {
+ StormBase base = stormClusterState.stormBase(topologyId,
null);
+ if (base != null) {
+ stormBases.put(topologyId, base);
+ }
+ }
+ }
+ return stormBases;
+ }
+
+ public static void validateDistributedMode(Map conf) {
+ if (ConfigUtils.isLocalMode(conf)) {
+ throw new IllegalArgumentException("Cannot start server in
local mode!");
+ }
+ }
+
+ private static void validateIds(StormTopology topology) throws
InvalidTopologyException {
+ List<String> componentIds = new ArrayList<String>();
+
+ for (StormTopology._Fields field : Thrift.getTopologyFields()) {
+ if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+ Object value = topology.getFieldValue(field);
+ if (value != null) {
+ Map<String, Object> componentMap = (Map<String,
Object>) value;
+ componentIds.addAll(componentMap.keySet());
+
+ for (String id : componentMap.keySet()) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + " is
not a valid component id.");
+ }
+ }
+ for (Object componentObj : componentMap.values()) {
+ ComponentCommon common =
getComponentCommon(componentObj);
+ Set<String> streamIds =
common.get_streams().keySet();
+ for (String id : streamIds) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + "
is not a valid stream id.");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ List<String> offending = Utils.getRepeat(componentIds);
+ if (offending.isEmpty() == false) {
+ throw new InvalidTopologyException("Duplicate component ids: "
+ offending);
+ }
+ }
+
+ private static boolean isEmptyInputs(ComponentCommon common) {
+ if (common == null) {
+ return true;
+ } else if (common.get_inputs() == null) {
+ return true;
+ } else {
+ return common.get_inputs().isEmpty();
+ }
+ }
+
+ public static Map<String, Object> allComponents(StormTopology
topology) {
+ Map<String, Object> components = new HashMap<String, Object>();
+ List<StormTopology._Fields> topologyFields =
Arrays.asList(Thrift.getTopologyFields());
+ for (StormTopology._Fields field : topologyFields) {
+ if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+ components.putAll(((Map) topology.getFieldValue(field)));
+ }
+ }
+ return components;
+ }
+
+ public static Map componentConf(Object component) {
+ Map<Object, Object> conf = new HashMap<Object, Object>();
+ ComponentCommon common = getComponentCommon(component);
+ if (common != null) {
+ String jconf = common.get_json_conf();
+ if (jconf != null) {
+ conf.putAll((Map<Object, Object>) JSONValue.parse(jconf));
+ }
+ }
+ return conf;
+ }
+
+ public static void validateBasic(StormTopology topology) throws
InvalidTopologyException {
+ validateIds(topology);
+
+ List<StormTopology._Fields> spoutFields =
Arrays.asList(Thrift.getSpoutFields());
+ for (StormTopology._Fields field : spoutFields) {
+ Map<String, Object> spoutComponents = (Map<String, Object>)
topology.getFieldValue(field);
+ if (spoutComponents != null) {
+ for (Object obj : spoutComponents.values()) {
+ ComponentCommon common = getComponentCommon(obj);
+ if (isEmptyInputs(common) == false) {
+ throw new InvalidTopologyException("May not
declare inputs for a spout");
+ }
+ }
+ }
+ }
+
+ Map<String, Object> componentMap = allComponents(topology);
+ for (Object componentObj : componentMap.values()) {
+ Map conf = componentConf(componentObj);
+ ComponentCommon common = getComponentCommon(componentObj);
+ if (common != null) {
+ int parallelismHintNum = Thrift.getParallelismHint(common);
+ Integer taskNum =
Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
+ if (taskNum > 0 && parallelismHintNum <= 0) {
+ throw new InvalidTopologyException("Number of
executors must be greater than 0 when number of tasks is greater than 0");
+ }
+ }
+ }
+ }
+
+ private static Set<String> getStreamOutputFields(Map<String,
StreamInfo> streams) {
+ Set<String> outputFields = new HashSet<String>();
+ if (streams != null) {
+ for (StreamInfo streamInfo : streams.values()) {
+ outputFields.addAll(streamInfo.get_output_fields());
+ }
+ }
+ return outputFields;
+ }
+
+ public static void validateStructure(StormTopology topology) throws
InvalidTopologyException {
+ Map<String, Object> componentMap = allComponents(topology);
+ for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
+ String componentId = entry.getKey();
+ ComponentCommon common = getComponentCommon(entry.getValue());
+ if (common != null) {
+ Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
+ for (Map.Entry<GlobalStreamId, Grouping> input :
inputs.entrySet()) {
+ String sourceStreamId = input.getKey().get_streamId();
+ String sourceComponentId =
input.getKey().get_componentId();
+ if(componentMap.keySet().contains(sourceComponentId)
== false) {
+ throw new InvalidTopologyException("Component: ["
+ componentId + "] subscribes from non-existent component [" +
sourceComponentId + "]");
+ }
+
+ ComponentCommon sourceComponent =
getComponentCommon(componentMap.get(sourceComponentId));
+ if (sourceComponent == null ||
sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
+ throw new InvalidTopologyException("Component: ["
+ componentId + "] subscribes from non-existent stream: " +
+ "[" + sourceStreamId + "] of component ["
+ sourceComponentId + "]");
+ }
+
+ Grouping grouping = input.getValue();
+ if (Thrift.groupingType(grouping) ==
Grouping._Fields.FIELDS) {
+ List<String> fields = grouping.get_fields();
+ Map<String, StreamInfo> streams =
sourceComponent.get_streams();
+ Set<String> sourceOutputFields =
getStreamOutputFields(streams);
+ if (sourceOutputFields.containsAll(fields) ==
false) {
+ throw new InvalidTopologyException("Component:
[" + componentId + "] subscribes from stream: [" + sourceStreamId +"] of
component " +
+ "[" + sourceComponentId + "] + with
non-existent fields: " + fields);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology
topology) {
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId,
Grouping>();
+ Set<String> boltIds = topology.get_bolts().keySet();
+ Set<String> spoutIds = topology.get_spouts().keySet();
+
+ for(String id : spoutIds) {
+ inputs.put(Utils.getGlobalStreamId(id, ACKER_INIT_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ }
+
+ for(String id : boltIds) {
+ inputs.put(Utils.getGlobalStreamId(id, ACKER_ACK_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ inputs.put(Utils.getGlobalStreamId(id, ACKER_FAIL_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ }
+ return inputs;
+ }
+
+ public static String clusterId = null;
+ public static IBolt makeAckerBolt() {
+ return _instance.makeAckerBoltImpl();
+ }
+ public IBolt makeAckerBoltImpl() {
+ return new Acker();
--- End diff --
This can be inlined.
> port backtype.storm.daemon.common to java
> -----------------------------------------
>
> Key: STORM-1269
> URL: https://issues.apache.org/jira/browse/STORM-1269
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Basti Liu
> Labels: java-migration, jstorm-merger
>
> Common utils shared by the daemons (Some things should just use the Thrift
> object)
> https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java
> is similar but not exactly the same.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)