This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new 8f7dc3d Adding consistent job submission, fixing zookeeper connection
issues and logging improvements
8f7dc3d is described below
commit 8f7dc3dc8889bd21cb00911d323a66721a960c81
Author: dimuthu <[email protected]>
AuthorDate: Tue Apr 3 15:53:24 2018 -0400
Adding consistent job submission, fixing zookeeper connection issues and
logging improvements
---
.../task/cancel/RemoteJobCancellationTask.java | 20 +------
.../impl/task/cancel/WorkflowCancellationTask.java | 21 +++++--
.../task/submission/DefaultJobSubmissionTask.java | 10 +---
.../impl/task/submission/JobSubmissionTask.java | 69 +++++++++++++++-------
.../helix/impl/workflow/PostWorkflowManager.java | 12 ++--
.../apache/airavata/helix/core/AbstractTask.java | 23 ++++++++
.../airavata/helix/workflow/WorkflowManager.java | 2 +-
7 files changed, 98 insertions(+), 59 deletions(-)
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
index 20813b0..2302233 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -2,8 +2,6 @@ package org.apache.airavata.helix.impl.task.cancel;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.CommandOutput;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
@@ -11,10 +9,6 @@ import
org.apache.airavata.helix.impl.task.submission.config.JobManagerConfigura
import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.helix.HelixManager;
import org.apache.helix.task.TaskResult;
import org.slf4j.Logger;
@@ -27,19 +21,9 @@ public class RemoteJobCancellationTask extends AiravataTask {
private final static Logger logger =
LoggerFactory.getLogger(RemoteJobCancellationTask.class);
- private CuratorFramework curatorClient = null;
-
@Override
public void init(HelixManager manager, String workflowName, String
jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- try {
- this.curatorClient =
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(),
retryPolicy);
- this.curatorClient.start();
- } catch (ApplicationSettingsException e) {
- logger.error("Failed to create curator client ", e);
- throw new RuntimeException(e);
- }
}
@Override
@@ -108,8 +92,8 @@ public class RemoteJobCancellationTask extends AiravataTask {
private List<String> getJobsOfProcess(String processId) throws Exception {
String path = "/registry/" + processId + "/jobs";
- if (this.curatorClient.checkExists().forPath(path) != null) {
- return this.curatorClient.getChildren().forPath(path);
+ if (getCuratorClient().checkExists().forPath(path) != null) {
+ return getCuratorClient().getChildren().forPath(path);
} else {
return null;
}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
index 0513327..95b256c 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/WorkflowCancellationTask.java
@@ -20,6 +20,7 @@ public class WorkflowCancellationTask extends AbstractTask {
private final static Logger logger =
LoggerFactory.getLogger(WorkflowCancellationTask.class);
private TaskDriver taskDriver;
+ private HelixManager helixManager;
@TaskParam(name = "Cancelling Workflow")
private String cancellingWorkflowName;
@@ -33,16 +34,15 @@ public class WorkflowCancellationTask extends AbstractTask {
try {
- HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(ServerSettings.getSetting("helix.cluster.name"),
taskName,
+ helixManager =
HelixManagerFactory.getZKHelixManager(ServerSettings.getSetting("helix.cluster.name"),
taskName,
InstanceType.SPECTATOR,
ServerSettings.getZookeeperConnection());
helixManager.connect();
Runtime.getRuntime().addShutdownHook(
- new Thread() {
- @Override
- public void run() {
+ new Thread(() -> {
+ if (helixManager.isConnected()) {
helixManager.disconnect();
}
- }
+ })
);
taskDriver = new TaskDriver(helixManager);
} catch (Exception e) {
@@ -74,6 +74,17 @@ public class WorkflowCancellationTask extends AbstractTask {
} catch (Exception e) {
logger.error("Failed to stop workflow " + cancellingWorkflowName,
e);
return onFail("Failed to stop workflow " + cancellingWorkflowName
+ ": " + e.getMessage(), true);
+ } finally {
+
+ try {
+ if (helixManager != null) {
+ if (helixManager.isConnected()) {
+ helixManager.disconnect();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to disconnect helix manager", e);
+ }
}
}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
index a9e2915..a85abfa 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
@@ -89,8 +89,7 @@ public class DefaultJobSubmissionTask extends
JobSubmissionTask {
statusList.get(0).setReason(submissionOutput.getFailureReason());
jobModel.setJobStatuses(statusList);
saveJobModel(jobModel);
- logger.error("expId: " + getExperimentId() + ", processid:
" + getProcessId()+ ", taskId: " +
- getTaskId() + " :- Job submission failed for job
name " + jobModel.getJobName()
+ logger.error("Job submission failed for job name " +
jobModel.getJobName()
+ ". Exit code : " +
submissionOutput.getExitCode() + ", Submission failed : "
+ submissionOutput.isJobSubmissionFailed());
@@ -100,19 +99,16 @@ public class DefaultJobSubmissionTask extends
JobSubmissionTask {
false, null);
} else {
-
String msg;
saveJobModel(jobModel);
ErrorModel errorModel = new ErrorModel();
if (submissionOutput.getExitCode() != Integer.MIN_VALUE) {
- msg = "expId:" + getExperimentId() + ", processId:" +
getProcessId() + ", taskId: " + getTaskId() +
- " return non zero exit code:" +
submissionOutput.getExitCode() + " for JobName:" + jobModel.getJobName() +
+ msg = "Returned non zero exit code:" +
submissionOutput.getExitCode() + " for JobName:" + jobModel.getJobName() +
", with failure reason : " +
submissionOutput.getFailureReason()
+ " Hence changing job state to Failed." ;
errorModel.setActualErrorMessage(submissionOutput.getFailureReason());
} else {
- msg = "expId:" + getExperimentId() + ", processId:" +
getProcessId() + ", taskId: " + getTaskId() +
- " doesn't return valid job submission exit
code for JobName:" + jobModel.getJobName() +
+ msg = "Didn't return valid job submission exit code
for JobName:" + jobModel.getJobName() +
", with failure reason : stdout ->" +
submissionOutput.getStdOut() +
" stderr -> " + submissionOutput.getStdErr() +
" Hence changing job state to Failed." ;
errorModel.setActualErrorMessage(msg);
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 16869bb..d63c459 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -20,6 +20,7 @@
package org.apache.airavata.helix.impl.task.submission;
import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
import org.apache.airavata.agents.api.CommandOutput;
import org.apache.airavata.agents.api.JobSubmissionOutput;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -38,10 +39,6 @@ import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.registry.cpi.*;
import org.apache.commons.io.FileUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.helix.HelixManager;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
@@ -55,24 +52,9 @@ public abstract class JobSubmissionTask extends AiravataTask
{
private final static Logger logger =
LoggerFactory.getLogger(JobSubmissionTask.class);
- private CuratorFramework curatorClient = null;
-
@Override
public void init(HelixManager manager, String workflowName, String
jobName, String taskName) {
super.init(manager, workflowName, jobName, taskName);
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- try {
- this.curatorClient =
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(),
retryPolicy);
- this.curatorClient.start();
- } catch (ApplicationSettingsException e) {
- logger.error("Failed to create curator client ", e);
- throw new RuntimeException(e);
- }
- }
-
- @SuppressWarnings("WeakerAccess")
- public CuratorFramework getCuratorClient() {
- return curatorClient;
}
// TODO perform exception handling
@@ -106,6 +88,7 @@ public abstract class JobSubmissionTask extends AiravataTask
{
addMonitoringCommands(groovyMapData);
String scriptAsString =
groovyMapData.getAsString(jobManagerConfiguration.getJobDescriptionTemplateName());
+ logger.info("Generated job submission script : " + scriptAsString);
int number = new SecureRandom().nextInt();
number = (number < 0 ? -number : number);
@@ -117,13 +100,15 @@ public abstract class JobSubmissionTask extends
AiravataTask {
logger.info("Copying file form " + tempJobFile.getAbsolutePath() + "
to remote path " + workingDirectory +
" of compute resource " +
getTaskContext().getComputeResourceId());
agentAdaptor.copyFileTo(tempJobFile.getAbsolutePath(),
workingDirectory);
- // TODO transfer file
+
RawCommandInfo submitCommand =
jobManagerConfiguration.getSubmitCommand(workingDirectory,
tempJobFile.getPath());
- logger.debug("Submit command for process id " + getProcessId() + " : "
+ submitCommand.getRawCommand());
+ logger.info("Submit command for process id " + getProcessId() + " : "
+ submitCommand.getRawCommand());
logger.debug("Working directory for process id " + getProcessId() + "
: " + workingDirectory);
- CommandOutput commandOutput =
agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory);
+ CommandOutput commandOutput =
submitCommandWithRecording(submitCommand, agentAdaptor, groovyMapData,
workingDirectory);
+ logger.info("Job " + groovyMapData.getJobName() + " submitted to
compute resource");
+ logger.info("Submission stdout: " + commandOutput.getStdOut() + ",
stderr: " + commandOutput.getStdError());
JobSubmissionOutput jsoutput = new JobSubmissionOutput();
jsoutput.setDescription(scriptAsString);
@@ -147,6 +132,46 @@ public abstract class JobSubmissionTask extends
AiravataTask {
return jsoutput;
}
+ /**
+ * This will write the standard output of the command to a file inside the
working directory of the process and
+ * if the agent does not receive the output through first invocation, it
retries by looking into the output file.
+ *
+ * @param submitCommand command to submit
+ * @param agentAdaptor agent adaptor to communicate with compute resource
+ * @param groovyMapData metadata object of the job
+ * @param workingDirectory working directory for the process
+ * @return {@link CommandOutput} of the submitted command
+ * @throws AgentException if agent failed to communicate with the compute
host
+ */
+ private CommandOutput submitCommandWithRecording(RawCommandInfo
submitCommand, AgentAdaptor agentAdaptor,
+ GroovyMapData
groovyMapData, String workingDirectory) throws AgentException {
+
+ String modifiedCommand = submitCommand.getCommand() + " | tee " +
getJobCommandRecordingFile(groovyMapData);
+ logger.info("Modified the submit command to support recording : " +
modifiedCommand);
+
+ CommandOutput commandOutput =
agentAdaptor.executeCommand(modifiedCommand, workingDirectory);
+
+ if (commandOutput.getStdOut() == null ||
"".equals(commandOutput.getStdOut())) {
+ logger.warn("command submission returned empty response so reading
recording file at " + getJobCommandRecordingFile(groovyMapData));
+ CommandOutput recordingFileReadCommandOutput =
agentAdaptor.executeCommand("cat " + getJobCommandRecordingFile(groovyMapData),
+ groovyMapData.getWorkingDirectory());
+ if (recordingFileReadCommandOutput.getStdOut() != null &&
!"".equals(recordingFileReadCommandOutput.getStdOut())) {
+ logger.info("Received non empty output form recording file : "
+ recordingFileReadCommandOutput.getStdOut());
+ return recordingFileReadCommandOutput;
+ } else {
+ return commandOutput;
+ }
+ } else {
+ return commandOutput;
+ }
+ }
+
+ private String getJobCommandRecordingFile(GroovyMapData mapData) {
+ return (mapData.getWorkingDirectory().endsWith(File.separator) ?
+ mapData.getWorkingDirectory() : mapData.getWorkingDirectory()
+ File.separator) +
+ mapData.getJobName();
+ }
+
@SuppressWarnings("WeakerAccess")
public File getLocalDataDir() {
String outputPath = ServerSettings.getLocalDataLocation();
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index e68b526..5fb2221 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -72,11 +72,16 @@ public class PostWorkflowManager {
private CuratorFramework curatorClient = null;
private Publisher statusPublisher;
+ private WorkflowManager workflowManager;
- private void init() throws ApplicationSettingsException {
+ private void init() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
this.curatorClient =
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(),
retryPolicy);
this.curatorClient.start();
+ workflowManager = new WorkflowManager(
+ ServerSettings.getSetting("helix.cluster.name"),
+ ServerSettings.getSetting("post.workflow.manager.name"),
+ ServerSettings.getZookeeperConnection());
}
private Consumer<String, JobStatusResult> createConsumer() throws
ApplicationSettingsException {
@@ -276,11 +281,6 @@ public class PostWorkflowManager {
}
allTasks.add(completingTask);
- WorkflowManager workflowManager = new WorkflowManager(
-
ServerSettings.getSetting("helix.cluster.name"),
-
ServerSettings.getSetting("post.workflow.manager.name"),
- ServerSettings.getZookeeperConnection());
-
String workflowName =
workflowManager.launchWorkflow(processId + "-POST-" +
UUID.randomUUID().toString(),
new ArrayList<>(allTasks), true, false);
try {
diff --git
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
index b7304d7..fa9a205 100644
---
a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
+++
b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java
@@ -19,10 +19,16 @@
*/
package org.apache.airavata.helix.core;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.core.util.TaskUtil;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskOutPort;
import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.helix.HelixManager;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
@@ -44,6 +50,8 @@ public abstract class AbstractTask extends UserContentStore
implements Task {
private static final String NEXT_JOB = "next-job";
private static final String WORKFLOW_STARTED = "workflow-started";
+ private static CuratorFramework curatorClient = null;
+
@TaskParam(name = "taskId")
private String taskId;
@@ -160,4 +168,19 @@ public abstract class AbstractTask extends
UserContentStore implements Task {
public void setNextTask(OutPort nextTask) {
this.nextTask = nextTask;
}
+
+ protected synchronized CuratorFramework getCuratorClient() {
+
+ if (curatorClient == null) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ try {
+ this.curatorClient =
CuratorFrameworkFactory.newClient(ServerSettings.getZookeeperConnection(),
retryPolicy);
+ this.curatorClient.start();
+ } catch (ApplicationSettingsException e) {
+ logger.error("Failed to create curator client ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ return curatorClient;
+ }
}
diff --git
a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
index df558f2..3197a47 100644
---
a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
+++
b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java
@@ -65,7 +65,7 @@ public class WorkflowManager {
taskDriver = new TaskDriver(helixManager);
}
- public String launchWorkflow(String processId, List<AbstractTask> tasks,
boolean globalParticipant, boolean monitor) throws Exception {
+ public synchronized String launchWorkflow(String processId,
List<AbstractTask> tasks, boolean globalParticipant, boolean monitor) throws
Exception {
String workflowName = WORKFLOW_PREFIX + processId;
logger.info("Launching workflow " + workflowName + " for process " +
processId);
--
To stop receiving notification emails like this one, please contact
[email protected].