[
https://issues.apache.org/jira/browse/STORM-1269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179622#comment-15179622
]
ASF GitHub Bot commented on STORM-1269:
---------------------------------------
Github user abhishekagarwal87 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1185#discussion_r55007994
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.Thrift;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.generated.*;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.metric.EventLoggerBolt;
+import org.apache.storm.metric.MetricsConsumerBolt;
+import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.testing.NonRichBoltTracker;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.IPredicate;
+import org.apache.storm.utils.ThriftTopologyUtils;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class StormCommon {
+ // A singleton instance allows us to mock delegated static methods in
our
+ // tests by subclassing.
+ private static StormCommon _instance = new StormCommon();
+
+ /**
+ * Provide an instance of this class for delegates to use. To mock out
+ * delegated methods, provide an instance of a subclass that overrides
the
+ * implementation of the delegated method.
+ * @param common a StormCommon instance
+ * @return the previously set instance
+ */
+ public static StormCommon setInstance(StormCommon common) {
+ StormCommon oldInstance = _instance;
+ _instance = common;
+ return oldInstance;
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StormCommon.class);
+
+ public static final String ACKER_COMPONENT_ID =
Acker.ACKER_COMPONENT_ID;
+ public static final String ACKER_INIT_STREAM_ID =
Acker.ACKER_INIT_STREAM_ID;
+ public static final String ACKER_ACK_STREAM_ID =
Acker.ACKER_ACK_STREAM_ID;
+ public static final String ACKER_FAIL_STREAM_ID =
Acker.ACKER_FAIL_STREAM_ID;
+
+ public static final String SYSTEM_STREAM_ID = "__system";
+
+ public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
+ public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
+
+ public static void startMetricsReporter(PreparableReporter report, Map
conf) {
+ report.prepare(new MetricRegistry(), conf);
+ report.start();
+ LOG.info("Started statistics report plugin...");
+ }
+
+ public static void startMetricsReporters(Map conf) {
+ List<PreparableReporter> reporters =
MetricsUtils.getPreparableReporters(conf);
+ for (PreparableReporter reporter : reporters) {
+ startMetricsReporter(reporter, conf);
+ }
+ }
+
+ public static String getTopologyNameById(String topologyId) {
+ String topologyName = null;
+ try {
+ topologyName = topologyIdToName(topologyId);
+ } catch (InvalidTopologyException e) {
+ LOG.error("Invalid topologyId=" + topologyId);
+ }
+ return topologyName;
+ }
+
+ /**
+ * Convert topologyId to topologyName. TopologyId =
topoloygName-counter-timeStamp
+ *
+ * @param topologyId
+ * @return
+ */
+ public static String topologyIdToName(String topologyId) throws
InvalidTopologyException {
+ String ret = null;
+ int index = topologyId.lastIndexOf('-');
+ if (index != -1 && index > 2) {
+ index = topologyId.lastIndexOf('-', index - 1);
+ if (index != -1 && index > 0)
+ ret = topologyId.substring(0, index);
+ else
+ throw new InvalidTopologyException(topologyId + " is not a
valid topologyId");
+ } else
+ throw new InvalidTopologyException(topologyId + " is not a
valid topologyId");
+ return ret;
+ }
+
+ public static String getStormId(IStormClusterState stormClusterState,
final String topologyName) {
+ List<String> activeTopologys = stormClusterState.activeStorms();
+ IPredicate pred = new IPredicate<String>() {
+ @Override
+ public boolean test(String obj) {
+ return obj != null ?
getTopologyNameById(obj).equals(topologyName) : false;
+ }
+ };
+ return Utils.findOne(pred, activeTopologys);
+ }
+
+ public static Map<String, StormBase> topologyBases(IStormClusterState
stormClusterState) {
+ return _instance.topologyBasesImpl(stormClusterState);
+ }
+
+ protected Map<String, StormBase> topologyBasesImpl(IStormClusterState
stormClusterState) {
+ List<String> activeTopologys = stormClusterState.activeStorms();
+ Map<String, StormBase> stormBases = new HashMap<String,
StormBase>();
+ if (activeTopologys != null) {
+ for (String topologyId : activeTopologys) {
+ StormBase base = stormClusterState.stormBase(topologyId,
null);
+ if (base != null) {
+ stormBases.put(topologyId, base);
+ }
+ }
+ }
+ return stormBases;
+ }
+
+ public static void validateDistributedMode(Map conf) {
+ if (ConfigUtils.isLocalMode(conf)) {
+ throw new IllegalArgumentException("Cannot start server in
local mode!");
+ }
+ }
+
+ private static void validateIds(StormTopology topology) throws
InvalidTopologyException {
+ List<String> componentIds = new ArrayList<String>();
+
+ for (StormTopology._Fields field : Thrift.getTopologyFields()) {
+ if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+ Object value = topology.getFieldValue(field);
+ if (value != null) {
+ Map<String, Object> componentMap = (Map<String,
Object>) value;
+ componentIds.addAll(componentMap.keySet());
+
+ for (String id : componentMap.keySet()) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + " is
not a valid component id.");
+ }
+ }
+ for (Object componentObj : componentMap.values()) {
+ ComponentCommon common =
getComponentCommon(componentObj);
+ Set<String> streamIds =
common.get_streams().keySet();
+ for (String id : streamIds) {
+ if (Utils.isSystemId(id)) {
+ throw new InvalidTopologyException(id + "
is not a valid stream id.");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ List<String> offending = Utils.getRepeat(componentIds);
+ if (offending.isEmpty() == false) {
+ throw new InvalidTopologyException("Duplicate component ids: "
+ offending);
+ }
+ }
+
+ private static boolean isEmptyInputs(ComponentCommon common) {
+ if (common == null) {
+ return true;
+ } else if (common.get_inputs() == null) {
+ return true;
+ } else {
+ return common.get_inputs().isEmpty();
+ }
+ }
+
+ public static Map<String, Object> allComponents(StormTopology
topology) {
+ Map<String, Object> components = new HashMap<String, Object>();
+ List<StormTopology._Fields> topologyFields =
Arrays.asList(Thrift.getTopologyFields());
+ for (StormTopology._Fields field : topologyFields) {
+ if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+ components.putAll(((Map) topology.getFieldValue(field)));
+ }
+ }
+ return components;
+ }
+
+ public static Map componentConf(Object component) {
+ Map<Object, Object> conf = new HashMap<Object, Object>();
+ ComponentCommon common = getComponentCommon(component);
+ if (common != null) {
+ String jconf = common.get_json_conf();
+ if (jconf != null) {
+ conf.putAll((Map<Object, Object>) JSONValue.parse(jconf));
+ }
+ }
+ return conf;
+ }
+
+ public static void validateBasic(StormTopology topology) throws
InvalidTopologyException {
+ validateIds(topology);
+
+ List<StormTopology._Fields> spoutFields =
Arrays.asList(Thrift.getSpoutFields());
+ for (StormTopology._Fields field : spoutFields) {
+ Map<String, Object> spoutComponents = (Map<String, Object>)
topology.getFieldValue(field);
+ if (spoutComponents != null) {
+ for (Object obj : spoutComponents.values()) {
+ ComponentCommon common = getComponentCommon(obj);
+ if (isEmptyInputs(common) == false) {
+ throw new InvalidTopologyException("May not
declare inputs for a spout");
+ }
+ }
+ }
+ }
+
+ Map<String, Object> componentMap = allComponents(topology);
+ for (Object componentObj : componentMap.values()) {
+ Map conf = componentConf(componentObj);
+ ComponentCommon common = getComponentCommon(componentObj);
+ if (common != null) {
+ int parallelismHintNum = Thrift.getParallelismHint(common);
+ Integer taskNum =
Utils.getInt(conf.get(Config.TOPOLOGY_TASKS), 0);
+ if (taskNum > 0 && parallelismHintNum <= 0) {
+ throw new InvalidTopologyException("Number of
executors must be greater than 0 when number of tasks is greater than 0");
+ }
+ }
+ }
+ }
+
+ private static Set<String> getStreamOutputFields(Map<String,
StreamInfo> streams) {
+ Set<String> outputFields = new HashSet<String>();
+ if (streams != null) {
+ for (StreamInfo streamInfo : streams.values()) {
+ outputFields.addAll(streamInfo.get_output_fields());
+ }
+ }
+ return outputFields;
+ }
+
+ public static void validateStructure(StormTopology topology) throws
InvalidTopologyException {
+ Map<String, Object> componentMap = allComponents(topology);
+ for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
+ String componentId = entry.getKey();
+ ComponentCommon common = getComponentCommon(entry.getValue());
+ if (common != null) {
+ Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
+ for (Map.Entry<GlobalStreamId, Grouping> input :
inputs.entrySet()) {
+ String sourceStreamId = input.getKey().get_streamId();
+ String sourceComponentId =
input.getKey().get_componentId();
+ if(componentMap.keySet().contains(sourceComponentId)
== false) {
+ throw new InvalidTopologyException("Component: ["
+ componentId + "] subscribes from non-existent component [" +
sourceComponentId + "]");
+ }
+
+ ComponentCommon sourceComponent =
getComponentCommon(componentMap.get(sourceComponentId));
+ if (sourceComponent == null ||
sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
+ throw new InvalidTopologyException("Component: ["
+ componentId + "] subscribes from non-existent stream: " +
+ "[" + sourceStreamId + "] of component ["
+ sourceComponentId + "]");
+ }
+
+ Grouping grouping = input.getValue();
+ if (Thrift.groupingType(grouping) ==
Grouping._Fields.FIELDS) {
+ List<String> fields = grouping.get_fields();
+ Map<String, StreamInfo> streams =
sourceComponent.get_streams();
+ Set<String> sourceOutputFields =
getStreamOutputFields(streams);
+ if (sourceOutputFields.containsAll(fields) ==
false) {
+ throw new InvalidTopologyException("Component:
[" + componentId + "] subscribes from stream: [" + sourceStreamId +"] of
component " +
+ "[" + sourceComponentId + "] + with
non-existent fields: " + fields);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology
topology) {
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId,
Grouping>();
+ Set<String> boltIds = topology.get_bolts().keySet();
+ Set<String> spoutIds = topology.get_spouts().keySet();
+
+ for(String id : spoutIds) {
+ inputs.put(Utils.getGlobalStreamId(id, ACKER_INIT_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ }
+
+ for(String id : boltIds) {
+ inputs.put(Utils.getGlobalStreamId(id, ACKER_ACK_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ inputs.put(Utils.getGlobalStreamId(id, ACKER_FAIL_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+ }
+ return inputs;
+ }
+
+ public static String clusterId = null;
+ public static IBolt makeAckerBolt() {
+ return _instance.makeAckerBoltImpl();
+ }
+ public IBolt makeAckerBoltImpl() {
+ return new Acker();
+ }
+
+ public static void addAcker(Map conf, StormTopology topology) {
+ int ackerNum =
Utils.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS),
Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
+
+ Map<String, StreamInfo> outputStreams = new HashMap<String,
StreamInfo>();
+ outputStreams.put(ACKER_ACK_STREAM_ID,
Thrift.directOutputFields(Arrays.asList("id")));
+ outputStreams.put(ACKER_FAIL_STREAM_ID,
Thrift.directOutputFields(Arrays.asList("id")));
+
+ Map<String, Object> ackerConf = new HashMap<String, Object>();
+ ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
+ ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+
+ Bolt acker = Thrift.prepareSerializedBoltDetails(inputs,
makeAckerBolt(), outputStreams, ackerNum, ackerConf);
+
+ for(Bolt bolt : topology.get_bolts().values()) {
+ ComponentCommon common = bolt.get_common();
+ common.put_to_streams(ACKER_ACK_STREAM_ID,
Thrift.outputFields(Arrays.asList("id", "ack-val")));
+ common.put_to_streams(ACKER_FAIL_STREAM_ID,
Thrift.outputFields(Arrays.asList("id")));
+ }
+
+ for (SpoutSpec spout : topology.get_spouts().values()) {
+ ComponentCommon common = spout.get_common();
+ Map spoutConf = componentConf(spout);
+ spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ common.set_json_conf(JSONValue.toJSONString(spoutConf));
+ common.put_to_streams(ACKER_INIT_STREAM_ID,
Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
+
common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID,
ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
+
common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID,
ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
+ }
+
+ topology.put_to_bolts(ACKER_COMPONENT_ID, acker);
+ }
+
+ public static ComponentCommon getComponentCommon(Object component) {
+ if (component == null) {
+ return null;
+ }
+
+ ComponentCommon common = null;
+ if (component instanceof StateSpoutSpec) {
+ common = ((StateSpoutSpec) component).get_common();
+ } else if (component instanceof SpoutSpec) {
+ common = ((SpoutSpec) component).get_common();
+ } else if (component instanceof Bolt) {
+ common = ((Bolt) component).get_common();
+ }
+ return common;
+ }
+
+ public static void addMetricStreams(StormTopology topology) {
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
+ if (common != null) {
+ StreamInfo streamInfo =
Thrift.outputFields(Arrays.asList("task-info", "data-points"));
+ common.put_to_streams(Constants.METRICS_STREAM_ID,
streamInfo);
+ }
+ }
+ }
+
+ public static void addSystemStreams(StormTopology topology) {
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
+ if (common != null) {
+ StreamInfo streamInfo =
Thrift.outputFields(Arrays.asList("event"));
+ common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
+ }
+ }
+ }
+
+ public static List<String> eventLoggerBoltFields() {
+ List<String> fields =
Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID,
EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS,
+ EventLoggerBolt.FIELD_VALUES);
+ return fields;
+ }
+
+ public static Map<GlobalStreamId, Grouping>
eventLoggerInputs(StormTopology topology) {
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId,
Grouping>();
+ Set<String> allIds = new HashSet<String>();
+ allIds.addAll(topology.get_bolts().keySet());
+ allIds.addAll(topology.get_spouts().keySet());
+
+ for(String id : allIds) {
+ inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
+ }
+ return inputs;
+ }
+
+ public static void addEventLogger(Map conf, StormTopology topology) {
+ Integer numExecutors =
Utils.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
Utils.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ HashMap<String, Object> componentConf = new HashMap<String,
Object>();
+ componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
+ componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+ Bolt eventLoggerBolt =
Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new
EventLoggerBolt(), null, numExecutors, componentConf);
+
+ for(Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
+ if (common != null) {
+ common.put_to_streams(EVENTLOGGER_STREAM_ID,
Thrift.outputFields(eventLoggerBoltFields()));
+ }
+ }
+ topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
+ }
+
+ public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf,
StormTopology topology) {
+ Map<String, Bolt> metricsConsumerBolts = new HashMap<String,
Bolt>();
+
+ Set<String> componentIdsEmitMetrics = new HashSet<String>();
+ componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
+ componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
+
+ Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId,
Grouping>();
+ for (String componentId : componentIdsEmitMetrics) {
+ inputs.put(Utils.getGlobalStreamId(componentId,
Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
+ }
+
+ List<Map<String, Object>> registerInfo = (List<Map<String,
Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
+ if (registerInfo != null) {
+ Map<String, Integer> classOccurrencesMap = new HashMap<String,
Integer>();
+ for (Map<String, Object> info : registerInfo) {
+ String className = (String) info.get("class");
+ Object argument = info.get("argument");
+ Integer phintNum =
Utils.getInt(info.get("parallelism.hint"), 1);
+ Map<String, Object> metricsConsumerConf = new
HashMap<String, Object>();
+ metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
+ Bolt metricsConsumerBolt =
Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className,
argument), null, phintNum, metricsConsumerConf);
+
+ String id = className;
+ if (classOccurrencesMap.containsKey(className)) {
+ // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\",
\"a#2\"]"
+ int occurrenceNum = classOccurrencesMap.get(className);
+ occurrenceNum++;
+ classOccurrencesMap.put(className, occurrenceNum);
+ id = Constants.METRICS_COMPONENT_ID_PREFIX + className
+ "#" + occurrenceNum;
+ } else {
+ classOccurrencesMap.put(className, 1);
+ }
+ metricsConsumerBolts.put(id, metricsConsumerBolt);
+ }
+ }
+ return metricsConsumerBolts;
+ }
+
+ public static void addMetricComponents(Map conf, StormTopology
topology) {
+ Map<String, Bolt> metricsConsumerBolts =
metricsConsumerBoltSpecs(conf, topology);
+ for (Map.Entry<String, Bolt> entry :
metricsConsumerBolts.entrySet()) {
+ topology.put_to_bolts(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static void addSystemComponents(Map conf, StormTopology
topology) {
+ Map<String, StreamInfo> outputStreams = new HashMap<String,
StreamInfo>();
+ outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID,
Thrift.outputFields(Arrays.asList("rate_secs")));
+ outputStreams.put(Constants.METRICS_TICK_STREAM_ID,
Thrift.outputFields(Arrays.asList("interval")));
+ outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID,
Thrift.outputFields(Arrays.asList("creds")));
+
+ Map<String, Object> boltConf = new HashMap<String, Object>();
+ boltConf.put(Config.TOPOLOGY_TASKS, 0);
+
+ Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null,
new SystemBolt(), outputStreams, 0, boltConf);
+ topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID,
systemBoltSpec);
+ }
+
+ public static StormTopology systemTopology(Map stormConf,
StormTopology topology) throws InvalidTopologyException {
+ return _instance.systemTopologyImpl(stormConf, topology);
+ }
+
+ protected StormTopology systemTopologyImpl(Map stormConf,
StormTopology topology) throws InvalidTopologyException {
+ validateBasic(topology);
+
+ StormTopology ret = topology.deepCopy();
+ addAcker(stormConf, ret);
+ addEventLogger(stormConf, ret);
+ addMetricComponents(stormConf, ret);
+ addSystemComponents(stormConf, ret);
+ addMetricStreams(ret);
+ addSystemStreams(ret);
+
+ validateStructure(ret);
+
+ return ret;
+ }
+
+ public static boolean hasAckers(Map stormConf) {
--- End diff --
hasAckers and hasEventLoggers is only called in executor so they can be
moved there or you can move them to ConfigUtils.java since these functions
solely operate on conf.
> port backtype.storm.daemon.common to java
> -----------------------------------------
>
> Key: STORM-1269
> URL: https://issues.apache.org/jira/browse/STORM-1269
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Basti Liu
> Labels: java-migration, jstorm-merger
>
> Common utils shared by the daemons (Some things should just use the Thrift
> object)
> https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java
> is similar but not exactly the same.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)