Repository: airavata Updated Branches: refs/heads/master ac3be7ae4 -> 2d9ea1f4a
ssh job submission task impl Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2d9ea1f4 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2d9ea1f4 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2d9ea1f4 Branch: refs/heads/master Commit: 2d9ea1f4a1394bffb2f90f847c3d25ca32946e57 Parents: ac3be7a Author: Chathuri Wimalasena <[email protected]> Authored: Thu Jun 18 11:24:41 2015 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Thu Jun 18 11:24:41 2015 -0400 ---------------------------------------------------------------------- .../apache/airavata/common/utils/Constants.java | 5 -- .../store/server/CredentialStoreServer.java | 4 +- .../credential/store/client/TestSSLClient.java | 4 +- .../apache/airavata/gfac/core/GFacUtils.java | 77 +++++++++++--------- .../gfac/impl/task/SSHJobSubmissionTask.java | 71 +++++++++++++++++- 5 files changed, 116 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java index 83f0cc5..6e1cb84 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java @@ -36,9 +36,4 @@ public final class Constants { public static final String REMOTE_OAUTH_SERVER_URL = "remote.oauth.authorization.server"; public static final String ADMIN_USERNAME = "admin.user.name"; public static final String ADMIN_PASSWORD = "admin.password"; - - public static final String PBS_JOB_MANAGER = "pbs"; - public static final String SLURM_JOB_MANAGER = "slurm"; - public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE"; - public static final String LSF_JOB_MANAGER = "LSF"; } http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java ---------------------------------------------------------------------- diff --git a/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java b/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java index 9fa3adb..3439e00 100644 --- a/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java +++ b/modules/credential-store/credential-store-service/src/main/java/org/apache/airavata/credential/store/server/CredentialStoreServer.java @@ -66,8 +66,8 @@ public class CredentialStoreServer implements IServer { new TSSLTransportFactory.TSSLTransportParameters(); String keystorePath = ServerSettings.getCredentialStoreThriftServerKeyStorePath(); String keystorePWD = ServerSettings.getCredentialStoreThriftServerKeyStorePassword(); - final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_PORT, "8960")); - final String serverHost = ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_HOST, null); + final int serverPort = Integer.parseInt(ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_PORT, "8960")); + final String serverHost = ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_HOST, null); params.setKeyStore(keystorePath, keystorePWD); TServerSocket serverTransport = TSSLTransportFactory.getServerSocket(serverPort, 100, InetAddress.getByName(serverHost), params); http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java ---------------------------------------------------------------------- diff --git a/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java b/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java index 2c93c71..fa19ea4 100644 --- a/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java +++ b/modules/credential-store/credential-store-stubs/src/main/java/org/apache/airavata/credential/store/client/TestSSLClient.java @@ -55,8 +55,8 @@ public class TestSSLClient { String keystorePath = ServerSettings.getCredentialStoreThriftServerKeyStorePath(); String keystorePWD = ServerSettings.getCredentialStoreThriftServerKeyStorePassword(); params.setTrustStore(keystorePath, keystorePWD); - final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_PORT, "8960")); - final String serverHost = ServerSettings.getSetting(Constants.CREDENTIAL_SERVER_HOST, null); + final int serverPort = Integer.parseInt(ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_PORT, "8960")); + final String serverHost = ServerSettings.getSetting(ServerSettings.CREDENTIAL_SERVER_HOST, null); transport = TSSLTransportFactory.getClientSocket(serverHost, serverPort, 10000, params); TProtocol protocol = new TBinaryProtocol(transport); http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index e630570..8c08940 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -37,11 +37,17 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer import org.apache.airavata.model.application.io.DataType; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.experiment.ExperimentModel; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; import org.apache.airavata.model.status.ExperimentState; import org.apache.airavata.model.status.ExperimentStatus; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.cpi.*; import org.apache.commons.io.FileUtils; @@ -230,27 +236,29 @@ public class GFacUtils { return buf.toString(); } -// public static void saveJobStatus(JobExecutionContext jobExecutionContext, -// JobDetails details, JobState state) throws GFacException { -// try { -// // first we save job details to the registry for sa and then save the job status. -// ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog(); -// JobStatus status = new JobStatus(); -// status.setJobState(state); -// details.setJobStatus(status); -// experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, details, + public static void saveJobStatus(ProcessContext processContext, + JobModel jobModel, JobState state) throws GFacException { + try { + // first we save job jobModel to the registry for sa and then save the job status. + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + JobStatus status = new JobStatus(); + status.setJobState(state); + jobModel.setJobStatus(status); + // FIXME - Should change according to the experiment catalog impl +// experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, jobModel, // new CompositeIdentifier(jobExecutionContext.getTaskData() -// .getTaskID(), details.getJobID())); -// JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(), -// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), -// jobExecutionContext.getGatewayID()); -// JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier); -// jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent); -// } catch (Exception e) { -// throw new GFacException("Error persisting job status" -// + e.getLocalizedMessage(), e); -// } -// } +// .getTaskID(), jobModel.getJobID())); + // FIXME - Routing keys might need to identify according to new data models + JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), null, + processContext.getProcessId(), processContext.getProcessModel().getExperimentId(), + processContext.getGatewayId()); + JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier); + processContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent); + } catch (Exception e) { + throw new GFacException("Error persisting job status" + + e.getLocalizedMessage(), e); + } + } // public static void updateJobStatus(JobExecutionContext jobExecutionContext, // JobDetails details, JobState state) throws GFacException { @@ -270,21 +278,22 @@ public class GFacUtils { // } // } -// public static void saveErrorDetails( -// JobExecutionContext jobExecutionContext, String errorMessage) -// throws GFacException { -// try { -// ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog(); -// ErrorModel details = new ErrorModel(); -// details.setActualErrorMessage(errorMessage); -// details.setCreationTime(Calendar.getInstance().getTimeInMillis()); -// experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details, + public static void saveErrorDetails( + ProcessContext processContext, String errorMessage) + throws GFacException { + try { + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + ErrorModel details = new ErrorModel(); + details.setActualErrorMessage(errorMessage); + details.setCreationTime(Calendar.getInstance().getTimeInMillis()); + // FIXME : Save error model according to new data model +// experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details, // jobExecutionContext.getTaskData().getTaskID()); -// } catch (Exception e) { -// throw new GFacException("Error persisting job status" -// + e.getLocalizedMessage(), e); -// } -// } + } catch (Exception e) { + throw new GFacException("Error persisting job status" + + e.getLocalizedMessage(), e); + } + } public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException { Map<String, Object> map = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/airavata/blob/2d9ea1f4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java index f14502f..9fb6aae 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java @@ -23,22 +23,28 @@ package org.apache.airavata.gfac.impl.task; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.LocalEventPublisher; import org.apache.airavata.gfac.core.*; import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.core.task.JobSubmissionTask; import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.gfac.impl.Factory; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.model.status.TaskState; import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.Map; public class SSHJobSubmissionTask implements JobSubmissionTask { @@ -84,9 +90,48 @@ public class SSHJobSubmissionTask implements JobSubmissionTask { } File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig); if (jobFile != null && jobFile.exists()){ + jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir()); - } + if (jobId != null && !jobId.isEmpty()) { + jobModel.setJobId(jobId); + GFacUtils.saveJobStatus(processContext, jobModel, JobState.SUBMITTED); +// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) +// , GfacExperimentState.JOBSUBMITTED)); + processContext.setJobModel(jobModel); + if (verifyJobSubmissionByJobId(remoteCluster, jobId)) { +// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) +// , GfacExperimentState.JOBSUBMITTED)); + GFacUtils.saveJobStatus(processContext, jobModel, JobState.QUEUED); + } + } else { + processContext.setJobModel(jobModel); + int verificationTryCount = 0; + while (verificationTryCount++ < 3) { + String verifyJobId = verifyJobSubmission(remoteCluster, jobModel); + if (verifyJobId != null && !verifyJobId.isEmpty()) { + // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED + jobId = verifyJobId; + jobModel.setJobId(jobId); +// publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) +// , GfacExperimentState.JOBSUBMITTED)); + GFacUtils.saveJobStatus(processContext, jobModel, JobState.QUEUED); + break; + } + Thread.sleep(verificationTryCount * 1000); + } + } + if (jobId == null || jobId.isEmpty()) { + String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find remote jobId for JobName:" + + jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed"; + log.error(msg); + GFacUtils.saveErrorDetails(processContext, msg); + // FIXME : Need to handle according to status update chain +// GFacUtils.publishTaskStatus(jobExecutionContext, publisher, TaskState.FAILED); + return TaskState.FAILED; + } + } + return TaskState.EXECUTING; } catch (AppCatalogException e) { log.error("Error while instatiating app catalog",e); throw new TaskException("Error while instatiating app catalog", e); @@ -99,10 +144,32 @@ public class SSHJobSubmissionTask implements JobSubmissionTask { } catch (SSHApiException e) { log.error("Error occurred while submitting the job", e); throw new TaskException("Error occurred while submitting the job", e); + } catch (IOException e) { + log.error("Error while reading the content of the job file", e); + throw new TaskException("Error while reading the content of the job file", e); + } catch (InterruptedException e) { + log.error("Error occurred while verifying the job submission", e); + throw new TaskException("Error occurred while verifying the job submission", e); } - return null; } + private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException { + JobStatus status = remoteCluster.getJobStatus(jobID); + return status != null && status.getJobState() != JobState.UNKNOWN; + } + + private String verifyJobSubmission(RemoteCluster remoteCluster, JobModel jobDetails) { + String jobName = jobDetails.getJobName(); + String jobId = null; + try { + jobId = remoteCluster.getJobIdByJobName(jobName, remoteCluster.getServerInfo().getUserName()); + } catch (SSHApiException e) { + log.error("Error while verifying JobId from JobName"); + } + return jobId; + } + + @Override public TaskState recover(TaskContext taskContext) throws TaskException { return null;
