import java.io.File;
import java.util.StringTokenizer;

import org.apache.log4j.Logger;
import org.globus.cog.abstraction.impl.common.AbstractionFactory;
import org.globus.cog.abstraction.impl.common.StatusEvent;
import org.globus.cog.abstraction.impl.common.task.ExecutionServiceImpl;
import org.globus.cog.abstraction.impl.common.task.IllegalSpecException;
import org.globus.cog.abstraction.impl.common.task.InvalidSecurityContextException;
import org.globus.cog.abstraction.impl.common.task.InvalidServiceContactException;
import org.globus.cog.abstraction.impl.common.task.JobSpecificationImpl;
import org.globus.cog.abstraction.impl.common.task.ServiceContactImpl;
import org.globus.cog.abstraction.impl.common.task.TaskImpl;
import org.globus.cog.abstraction.impl.common.task.TaskSubmissionException;
import org.globus.cog.abstraction.interfaces.ExecutionService;
import org.globus.cog.abstraction.interfaces.JobSpecification;
import org.globus.cog.abstraction.interfaces.SecurityContext;
import org.globus.cog.abstraction.interfaces.ServiceContact;
import org.globus.cog.abstraction.interfaces.Status;
import org.globus.cog.abstraction.interfaces.StatusListener;
import org.globus.cog.abstraction.interfaces.Task;
import org.globus.cog.abstraction.interfaces.TaskHandler;
import org.globus.cog.abstraction.xml.TaskMarshaller;

/**
 * @author wilsonjr
 *
 */
public class JobSubmission implements StatusListener {
	
    static Logger logger = Logger.getLogger(JobSubmission.class.getName());

    private String name = null;

    private String checkpointFile = null;

    private Task task = null;

    private String serviceContact = null;

    private String provider = null;

    private boolean batch = false;

    private boolean redirected;
    

    private String executable = null;

    private String arguments = null;

    private String environmentArg = null;

    private String attributesArg = null;

    private String directory = null;

    private String stderr, stdout, stdin;

    private boolean commandLine = false;

    private String jobmanager = null;

    private String specification = null;
    
    
    public void prepareTask() throws Exception {
        /*
         * Create a new job submission task with the given task name.
         */
        this.task = new TaskImpl(this.name, Task.JOB_SUBMISSION);
        
        logger.debug("Task Identity: " + this.task.getIdentity().toString());

        /*
         * Generate a new JobSpecification with all the given attributes.
         */
        JobSpecification spec = new JobSpecificationImpl();

        if (this.specification != null) {
            spec.setSpecification(this.specification);
        } else {
            spec.setExecutable(executable);

            if (arguments != null) {
                spec.setArguments(arguments);
            }

            if (environmentArg != null) {
                setEnvironment(spec);
            }

            if (directory != null) {
                spec.setDirectory(directory);
            }

            /*
             * if the task is a batch task, then this example returns
             * immediately after submission. Otherwise it will return only after
             * the task is completed or failed.
             */
            if (batch) {
                spec.setBatchJob(true);
            }
            

            /*
             * If the redirected flag is set to true, the stdout and stderr of
             * the task will be redirected to the local machine. Otherwise these
             * will be piped on the remote machine.
             */
            if (redirected) {
                spec.setRedirected(true);
            }

            /*
             * The file name where the stdout of the task should be piped. If
             * null, the stdout is piped to the console. Hence, if stdout =
             * "output.txt" and redirected = true, then the output file
             * output.txt is available on the local machine.
             */
            if (stdout != null) {
                spec.setStdOutput(stdout);
            }

            /*
             * The file name where the stderr of the task should be piped. If
             * null, the stderr is piped to the console. Hence, if stderr =
             * "error.txt" and redirected = true, then the error file error.txt
             * is available on the local machine.
             */
            if (stderr != null) {
                spec.setStdError(stderr);
            }
            
            if (stdin != null) {
            	spec.setStdInput(stdin);
            }
            
            /*
             * All additional attributes that are not available as an API for
             * the JobSpecification interface can be provided as a task
             * attribute. For example the "count" paramter in a GT rsl is
             * important for MPI jobs. Since it is not a part of the
             * JobSpecification intercace, it can be provided as a task
             * attribute.
             */
            if (this.attributesArg != null) {
                setAttributes(spec);
            }
        }
        this.task.setSpecification(spec);
      
        /*
         * Create an execution service for this task.
         */
        ExecutionService service = new ExecutionServiceImpl();
        service.setProvider(this.provider.toLowerCase());
        
        SecurityContext securityContext = AbstractionFactory.newSecurityContext(provider);
        securityContext.setCredentials(null);
        service.setSecurityContext(securityContext);
        
        ServiceContact sc = new ServiceContactImpl(this.serviceContact);
        service.setServiceContact(sc);

        /*
         * This is an abstraction for the jobmanager. For example, the
         * servicecontact can be hot.anl.gov. One can specify different
         * jobmanagers for the same service contact, i.e if jobmanager = PBS
         * then the service is equivalent to hot.anl.gov/jobmanager-pbs
         */
        service.setJobManager(jobmanager);

        this.task.addService(service);

        /*
         * Add a task listerner for this task. This allows the task to be
         * executed asynchronously. The client can continue with other
         * activities and gets asynchronously notified every time the status of
         * the task changes.
         */
        this.task.addStatusListener(this);
    }
    
    private void submitTask() throws Exception {
    	
        TaskHandler handler = AbstractionFactory.newExecutionTaskHandler(provider);
        
        try {
        	
            handler.submit(this.task);
            
        } catch (InvalidSecurityContextException ise) {
            System.out.println("Security Exception: " + ise.getMessage());
            logger.debug("Stack trace: ", ise);
            System.exit(1);
        } catch (TaskSubmissionException tse) {
            System.out.println("Submission Exception: " + tse.getMessage());
            logger.debug("Stack trace: ", tse);
            System.exit(1);
        } catch (IllegalSpecException ispe) {
            System.out.println("Specification Exception: " + ispe.getMessage());
            logger.debug("Stack trace: ", ispe);
            System.exit(1);
        } catch (InvalidServiceContactException isce) {
            System.out.println("Service Contact Exception");
            logger.debug("Stack trace: ", isce);
            System.exit(1);
        }
        //wait
        while (true) {
            Thread.sleep(1000);
        }
    }    

    public void marshal() {
        try {
            // Translate the task object into an XML file
            File xmlFile = new File(this.checkpointFile);
            xmlFile.createNewFile();
            TaskMarshaller.marshal(this.task, xmlFile);
        } catch (Exception e) {
            logger.error("Cannot marshal the task", e);
        }
    }

    private void setEnvironment(JobSpecification spec) {
        String env = getEnvironmentArg();
        StringTokenizer st = new StringTokenizer(env, ",");
        while (st.hasMoreTokens()) {
            String token = st.nextToken();
            if (token.length() > 0) {
                StringTokenizer st2 = new StringTokenizer(token, "=");
                while (st2.hasMoreTokens()) {
                    String name = st2.nextToken().trim();
                    String value = st2.nextToken().trim();
                    spec.addEnvironmentVariable(name, value);
                }
            }
        }
    }

    private void setAttributes(JobSpecification spec) {
        String att = getAttributesArg();
        StringTokenizer st = new StringTokenizer(att, ",");
        while (st.hasMoreTokens()) {
            String token = st.nextToken();
            if (token.length() > 0) {
                StringTokenizer st2 = new StringTokenizer(token, "=");
                while (st2.hasMoreTokens()) {
                    String name = st2.nextToken().trim();
                    String value = st2.nextToken().trim();
                    spec.setAttribute(name, value);
                }
            }
        }
    }
    
	public void statusChanged(StatusEvent event) {
        Status status = event.getStatus();
        logger.debug("Status changed to " + status.getStatusString());
        if (status.getStatusCode() == Status.SUBMITTED) {
            if (this.checkpointFile != null) {
                marshal();
                System.out.println("Task checkpointed to file: "
                        + this.checkpointFile);
            }
        }
        if (status.getStatusCode() == Status.FAILED) {
            if (event.getStatus().getMessage() != null) {
                System.out.println("Job failed: "
                        + event.getStatus().getMessage());
            } else if (event.getStatus().getException() != null) {
                System.out.println("Job failed: ");
                event.getStatus().getException().printStackTrace();
            } else {
                System.out.println("Job failed");
            }
            if (this.commandLine) {
                System.exit(1);
            }
        }
        if (status.getStatusCode() == Status.COMPLETED) {
            if (isBatch()) {
                System.out.println("Job Submitted");
            } else {
                System.out.println("Job completed");
                if (this.task.getStdOutput() != null) {
                    System.out.println(this.task.getStdOutput());
                }
                if (this.task.getStdError() != null) {
                    System.err.println(this.task.getStdError());
                }
            }
            if (this.commandLine) {
                System.exit(0);
            }
        }
	}
    
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		
		System.out.println("Testando.");
		
		try {
			
			JobSubmission jobSubmission = new JobSubmission();
			
			System.out.println("jobSubmission.serviceContact = topgrid.dcc.ufba.br:8443");
			jobSubmission.setServiceContact("topgrid.dcc.ufba.br:8443");
			
			System.out.println("jobSubmission.provider = gt4.0.0");
			jobSubmission.setProvider("gt4.0.0");
			
			System.out.println("jobSubmission.jobManager = PBS");
			jobSubmission.setJobmanager("PBS");
			
			System.out.println("jobSubmission.name = myTestTask");
			jobSubmission.setName("myTestTask");
			
			System.out.println("jobSubmission.checkPointFile = null");
			jobSubmission.setCheckpointFile(null);
			
			System.out.println("jobSubmission.commandLine = true");
			jobSubmission.setCommandLine(true);
			
			System.out.println("jobSubmission.batch = true");
			jobSubmission.setBatch(true);
			
			System.out.println("jobSubmission.redirected = false");
			jobSubmission.setRedirected(false);
			
			System.out.println("jobSubmission.specification = null");
			jobSubmission.setSpecification(null);
			
			System.out.println("jobSubmission.executable = /bin/ls");
			jobSubmission.setExecutable("/bin/ls");
			
			System.out.println("jobSubmission.arguments = minucia.xyt outraminucia.xyt");
			jobSubmission.setArguments("-l novo");
			
			System.out.println("jobSubmission.enviromentArg = null");
			jobSubmission.setEnvironmentArg(null);
			
			System.out.println("jobSubmission.attributestArg = null");
			jobSubmission.setAttributesArg(null);
			
			System.out.println("jobSubmission.directory = /tmp");
			jobSubmission.setDirectory("/tmp");
			
			System.out.println("jobSubmission.stdout = arquivoSaida");
			jobSubmission.setStdout("arquivoSaida");
			
			System.out.println("jobSubmission.stderr = arquivoErro");
			jobSubmission.setStderr("arquivoErro");
			
			System.out.println("jobSubmission.stdin = novo");
			jobSubmission.setStdin("novo");
			
			// Setando string specification.
			String lSpecification = new String();
			lSpecification =	"<job>" +
								"	<executable>my_bozorth3</executable>" +
								"	<directory>${GLOBUS_USER_HOME}</directory>" +
								"	<argument>minucia.xyt</argument>" +
								"	<argument>outraMinucia.xyt</argument>" +
								"	<stdout>${GLOBUS_USER_HOME}/stdout</stdout>" +
								"	<stderr>${GLOBUS_USER_HOME}/stderr</stderr>" +
								"	<fileStageIn>" +
								"		<transfer>" +
								"			<sourceUrl>gsiftp://topgrid.dcc.ufba.br:2811/usr/bin/bozorth3</sourceUrl>" +
								"			<destinationUrl>file:///${GLOBUS_USER_HOME}/my_bozorth3</destinationUrl>" +
								"		</transfer>" +
								"		<transfer>" +
								"			<sourceUrl>gsiftp://topgrid.dcc.ufba.br:2811/home/wilsonjr/Cog/minucia.xyt</sourceUrl>" +
								"			<destinationUrl>file:///${GLOBUS_USER_HOME}/minucia.xyt</destinationUrl>" +
								"		</transfer>" +
								"		<transfer>" +
								"			<sourceUrl>gsiftp://topgrid.dcc.ufba.br:2811/home/wilsonjr/Cog/outraMinucia.xyt</sourceUrl>" +
								"			<destinationUrl>file:///${GLOBUS_USER_HOME}/outraMinucia.xyt</destinationUrl>" +
								"		</transfer>" +
								"	</fileStageIn>" +
								"</job>";

			jobSubmission.setSpecification(lSpecification);
			
			System.out.println("jobSubmission.prepareTask();");
			jobSubmission.prepareTask();
			
			System.out.println("jobSubmission.submitTask();");
            jobSubmission.submitTask();
            
            System.out.println("Tarefa submetida. Bye.");
			
		} catch (Exception e) {
			logger.error("Exception in main", e);
			e.printStackTrace();
		}
	}

	// Métodos Acessores
	/**
	 * @return the name
	 */
	public String getName() {
		return name;
	}

	/**
	 * @param name the name to set
	 */
	public void setName(String name) {
		this.name = name;
	}

	/**
	 * @return the checkpointFile
	 */
	public String getCheckpointFile() {
		return checkpointFile;
	}

	/**
	 * @param checkpointFile the checkpointFile to set
	 */
	public void setCheckpointFile(String checkpointFile) {
		this.checkpointFile = checkpointFile;
	}

	/**
	 * @return the task
	 */
	public Task getTask() {
		return task;
	}

	/**
	 * @param task the task to set
	 */
	public void setTask(Task task) {
		this.task = task;
	}

	/**
	 * @return the serviceContact
	 */
	public String getServiceContact() {
		return serviceContact;
	}

	/**
	 * @param serviceContact the serviceContact to set
	 */
	public void setServiceContact(String serviceContact) {
		this.serviceContact = serviceContact;
	}

	/**
	 * @return the provider
	 */
	public String getProvider() {
		return provider;
	}

	/**
	 * @param provider the provider to set
	 */
	public void setProvider(String provider) {
		this.provider = provider;
	}

	/**
	 * @return the batch
	 */
	public boolean isBatch() {
		return batch;
	}

	/**
	 * @param batch the batch to set
	 */
	public void setBatch(boolean batch) {
		this.batch = batch;
	}

	/**
	 * @return the redirected
	 */
	public boolean isRedirected() {
		return redirected;
	}

	/**
	 * @param redirected the redirected to set
	 */
	public void setRedirected(boolean redirected) {
		this.redirected = redirected;
	}

	/**
	 * @return the executable
	 */
	public String getExecutable() {
		return executable;
	}

	/**
	 * @param executable the executable to set
	return executable;
	 */
	public void setExecutable(String executable) {
		this.executable = executable;
	}

	/**
	 * @return the arguments
	 */
	public String getArguments() {
		return arguments;
	}

	/**
	 * @param arguments the arguments to set
	 */
	public void setArguments(String arguments) {
		this.arguments = arguments;
	}

	/**
	 * @return the environment
	 */
	public String getEnvironmentArg() {
		return environmentArg;
	}

	/**
	 * @param environmentArg the environment to set
	 */
	public void setEnvironmentArg(String environmentArg) {
		this.environmentArg = environmentArg;
	}

	/**
	 * @return the attributes
	 */
	public String getAttributesArg() {
		return attributesArg;
	}

	/**
	 * @param attributesArg the attributes to set
	 */
	public void setAttributesArg(String attributesArg) {
		this.attributesArg = attributesArg;
	}

	/**
	 * @return the directory
	 */
	public String getDirectory() {
		return directory;
	}

	/**
	 * @param directory the directory to set
	 */
	public void setDirectory(String directory) {
		this.directory = directory;
	}

	/**
	 * @return the stderr
	 */
	public String getStderr() {
		return stderr;
	}

	/**
	 * @param stderr the stderr to set
	 */
	public void setStderr(String stderr) {
		this.stderr = stderr;
	}

	/**
	 * @return the stdout
	 */
	public String getStdout() {
		return stdout;
	}

	/**
	 * @param stdout the stdout to set
	 */
	public void setStdout(String stdout) {
		this.stdout = stdout;
	}

	/**
	 * @return the stdin
	 */
	public String getStdin() {
		return stdin;
	}

	/**
	 * @param stdin the stdin to set
	 */
	public void setStdin(String stdin) {
		this.stdin = stdin;
	}

	/**
	 * @return the commandLine
	 */
	public boolean isCommandLine() {
		return commandLine;
	}

	/**
	 * @param commandLine the commandLine to set
	 */
	public void setCommandLine(boolean commandLine) {
		this.commandLine = commandLine;
	}

	/**
	 * @return the jobmanager
	 */
	public String getJobmanager() {
		return jobmanager;
	}

	/**
	 * @param jobmanager the jobmanager to set
	 */
	public void setJobmanager(String jobmanager) {
		this.jobmanager = jobmanager;
	}

	/**
	 * @return the specification
	 */
	public String getSpecification() {
		return specification;
	}

	/**
	 * @param specification the specification to set
	 */
	public void setSpecification(String specification) {
		this.specification = specification;
	}
	
}
