[
https://issues.apache.org/jira/browse/STORM-1864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15299573#comment-15299573
]
ASF GitHub Bot commented on STORM-1864:
---------------------------------------
Github user abhishekagarwal87 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1446#discussion_r64523789
--- Diff: storm-core/src/jvm/org/apache/storm/StormSubmitter.java ---
@@ -223,48 +227,60 @@ public static void submitTopologyAs(String name, Map
stormConf, StormTopology to
// this is for backwards compatibility
localNimbus.submitTopology(name, stormConf, topology);
}
+ LOG.info("Finished submitting topology: " + name);
} else {
String serConf = JSONValue.toJSONString(stormConf);
- NimbusClient client =
NimbusClient.getConfiguredClientAs(conf, asUser);
if(topologyNameExists(conf, name, asUser)) {
throw new RuntimeException("Topology with name `" +
name + "` already exists on cluster");
}
String jar = submitJarAs(conf,
System.getProperty("storm.jar"), progressListener, asUser);
- try {
- LOG.info("Submitting topology " + name + " in
distributed mode with conf " + serConf);
- if(opts!=null) {
+ try (NimbusClient client =
NimbusClient.getConfiguredClientAs(conf, asUser)) {
+ LOG.info("Submitting topology " + name + " in
distributed mode with conf " + serConf);
+ if (opts != null) {
client.getClient().submitTopologyWithOpts(name,
jar, serConf, topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, jar,
serConf, topology);
}
- } catch(InvalidTopologyException e) {
- LOG.warn("Topology submission exception:
"+e.get_msg());
+ LOG.info("Finished submitting topology: " + name);
+ } catch (InvalidTopologyException e) {
+ LOG.warn("Topology submission exception: " +
e.get_msg());
throw e;
- } catch(AlreadyAliveException e) {
+ } catch (AlreadyAliveException e) {
LOG.warn("Topology already alive exception", e);
throw e;
- } finally {
- client.close();
}
}
- LOG.info("Finished submitting topology: " + name);
} catch(TException e) {
throw new RuntimeException(e);
}
invokeSubmitterHook(name, asUser, conf, topology);
}
+ /**
+ *
+ * @param name
+ * @param asUser
+ * @param stormConf
+ * @param topology
+ *
+ * @thorws SubmitterHookException This is thrown when any Exception
occurs during initialization or invocation of registered {@link ISubmitterHook}
+ */
private static void invokeSubmitterHook(String name, String asUser,
Map stormConf, StormTopology topology) {
+ String submissionNotifierClassName = null;
try {
if
(stormConf.containsKey(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN)) {
- ISubmitterHook submitterHook = (ISubmitterHook)
Class.forName(stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString()).newInstance();
+ submissionNotifierClassName =
stormConf.get(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN).toString();
+ LOG.info("Initializing the registered ISubmitterHook
[{}]", submissionNotifierClassName);
+ ISubmitterHook submitterHook = (ISubmitterHook)
Class.forName(submissionNotifierClassName).newInstance();
TopologyInfo topologyInfo = Utils.getTopologyInfo(name,
asUser, stormConf);
+ LOG.info("Invoking the registered ISubmitterHook [{}]",
submissionNotifierClassName);
submitterHook.notify(topologyInfo, stormConf, topology);
}
} catch (Exception e) {
- throw new RuntimeException(e);
+ LOG.warn("Error occurred in invoking submitter hook:
"+submissionNotifierClassName, e);
--- End diff --
Minor. can use {} notation instead of concatenation.
> StormSubmitter should show proper error messages for submitter hook
> invocation.
> -------------------------------------------------------------------------------
>
> Key: STORM-1864
> URL: https://issues.apache.org/jira/browse/STORM-1864
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-core, storm-hive
> Reporter: Satish Duggana
> Assignee: Satish Duggana
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)