#
# Marko Krznaric
# London eScience Centre
# June 2003 
#
# Contributions by David McBride
# London eScience Centre
# Oct 2003
#
# Modified by
#   AIST Grid Technology Research Center, Japan
#     and
#   Asia Pacific Science and Technology Center, Singapore
# Oct 2004, Apr 2006
#

use Globus::GRAM::Error;
use Globus::GRAM::JobState;
use Globus::GRAM::JobManager;
use Globus::Core::Paths;

use IO::File;
use Config;
use POSIX;

package Globus::GRAM::JobManager::sge;

@ISA = qw(Globus::GRAM::JobManager);

my ($qsub, $qstat, $qdel, $qselect, $qhost, $qconf, $qacct,
    $mpirun, $sun_mprun, $mpi_pe, $cat, $supported_job_types,
    $SGE_ROOT, $SGE_CELL, $SGE_MODE, $SGE_RELEASE, $SGE_QMASTER_PORT);

BEGIN
{
    $qsub        = '/opt/gridengine/bin/lx26-amd64/qsub';
    $qstat       = '/opt/gridengine/bin/lx26-amd64/qstat';
    $qdel        = '/opt/gridengine/bin/lx26-amd64/qdel';
    $qselect     = '/opt/gridengine/bin/lx26-amd64/qselect';
    $qhost       = '/opt/gridengine/bin/lx26-amd64/qhost';
    $qconf       = '/opt/gridengine/bin/lx26-amd64/qconf';
    $qacct       = '/opt/gridengine/bin/lx26-amd64/qacct';
    #
    $mpirun      = '/opt/lam/gnu/bin/mpirun';
    $sun_mprun   = 'no';
    $mpi_pe      = '';
    #
    if(($mpirun eq "no") && ($sun_mprun eq "no"))
      { $supported_job_types = "(single|multiple)"; }
    else
      { $supported_job_types = "(mpi|single|multiple)"; }
    #
    $cat         = '/bin/cat';
    #
    $SGE_ROOT    = '/opt/gridengine';
    $SGE_CELL    = 'default';
    $SGE_MODE    = 'SGE';
    $SGE_RELEASE = '6.0u8';
    $SGE_QMASTER_PORT = '536';
}


#########################################################################
#
# SUBMIT
#
########################################################################
sub submit
{
    my $self = shift;
    my $description = $self->{JobDescription};
    my $sge_job_script;
    my $sge_job_script_name;
    my $job_id;
    my $rsh_env = "";
    my @arguments;
    my $email_when = "";
    #my %library_vars;

    $self->log("Entering SGE submit");

    #####
    # check jobtype
    #
    if(defined($description->jobtype()))
    {
        if($description->jobtype !~ /^$supported_job_types$/)
        {
            return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
        }
    }

    #####
    # check directory
    #
    if( $description->directory eq "")
    {
	return Globus::GRAM::Error::RSL_DIRECTORY();
    }
    chdir $description->directory() or
	return Globus::GRAM::Error::BAD_DIRECTORY();

    #####
    # check executable
    #
    if( $description->executable eq "")
    {
	return Globus::GRAM::Error::RSL_EXECUTABLE();
    }
    elsif(! -f $description->executable())
    {
	return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
    }
    elsif(! -x $description->executable())
    {
	return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
    }
    elsif( $description->stdin() eq "")
    {
	return Globus::GRAM::Error::RSL_STDIN;
    }
    elsif(! -r $description->stdin())
    {
	return Globus::GRAM::Error::STDIN_NOT_FOUND();
    }


    #####
    # RSL attributes max_cpu_time/max_wall_time (given in minutes)
    # explicitly set the maximum cpu/wall time. max_time can be used
    # for both, max_cpu_time and max_wall_time

    #####
    # determining max_wall_time
    #
    $self->log("Determining job WALL time");
    if(defined($description->max_wall_time()))
    {
	$wall_time = $description->max_wall_time();
        $self->log("  using max_wall_time of $wall_time minutes");
    }
    elsif(defined($description->max_time()))
    {
        $wall_time = $description->max_time();
        $self->log("  using max_wall_time of $wall_time minutes");
    }
    else
    {
	$wall_time = 0;
        $self->log("  using queue default");
    }

    #####
    # determining max_cpu_time
    #
    $self->log("Determining job CPU time");
    if(defined($description->max_cpu_time()))
    {
        $cpu_time = $description->max_cpu_time();
        $self->log("  using max_cpu_time of $cpu_time minutes");
    }
    elsif(defined($description->max_time()))
    {
        $cpu_time = $description->max_time();
        $self->log("  using max_cpu_time of $cpu_time minutes");
    }
    else
    {
        $cpu_time = 0;
        $self->log("  using queue default");
    }


    #####
    # start building job script
    #
    $self->log('Building job script');


    #####
    # open script file
    #
    $sge_job_script_name = $self->job_dir() . '/scheduler_sge_job_script';
    $sge_job_script = new IO::File($sge_job_script_name, '>');
    # $self->log("  script location: $sge_job_script_name");


    #####
    # Writing script header
    #
    $sge_job_script->print("#!/bin/sh\n");
    $sge_job_script->print("# Grid Engine batch job script built by ");
    $sge_job_script->print("Globus job manager\n");
    $sge_job_script->print("\n");
    $sge_job_script->print("#\$ -S /bin/sh\n");


    #####
    # Whom to send email and when
    #
    if($description->email_address() ne '')
    {
        $sge_job_script->print("#\$ -M ". $description->email_address() ."\n");
        $self->log("Monitoring job by email");
        $self->log("  email address: " . $description->email_address());
    }
    if($description->emailonabort() eq 'yes')
    {
        $email_when .= 'a';
        $self->log("  email when job is aborted");
    }
    if($description->emailonexecution() eq 'yes')
    {
        $email_when .= 'b';
        $self->log("  email at the beginning of the job");
    }
    if($description->emailontermination() eq 'yes')
    {
        $email_when .= 'e';
        $self->log("  email at the end of the job");
    }
    if($description->emailonsuspend() eq 'yes')
    {
        $email_when .= 's';
        $self->log("  email when job is suspended");
    }
    if($email_when eq '')
    {
	$email_when = 'n';
        $self->log("  email(s) will not be sent");
    }
    $sge_job_script->print("#\$ -m $email_when\n");


    #####
    # Defines a list of queues used to execute this job
    #
    if(defined($description->queue()))
    {
        $sge_job_script->print("#\$ -q " . $description->queue() . "\n");
        $self->log("Using the following queues:");
        $self->log("  " . $description->queue);
    }


    #####
    # Writing project info, but only with SGEEE. Otherwise ignore and warn!
    #
    $self->log("Checking project details");
    if(defined($description->project()))
    {
        if( $SGE_MODE eq "SGEEE" )
	{
            $self->log("  SGE Enterprise Edition: projects supported");
            $self->log("  Job assigned to " . $description->project());
            $sge_job_script->print("#\$ -P ". $description->project() ."\n");
	}
        else
        {
            $self->log("  SGE Regular Edition: NO project support");
            $self->log("  WARNING: Project set to " . $description->project());
            $self->log("    but it will be ignored by Grid Engine");
            $sge_job_script->print("#\$ -P ". $description->project() ."\n");
        }
    } 
    else
    {
        $self->log("  Project not specified");
    }


    #####
    # wall_time was in minutes. Converting to SGE time format (h:m:s)
    #
    if($wall_time != 0)
    {
        $wall_m = $wall_time % 60;
        $wall_h = ( $wall_time - $wall_m ) / 60;

        $self->log("Using max WALL time (h:m:s) of $wall_h:$wall_m:00");
        $sge_job_script->print("#\$ -l h_rt=$wall_h:$wall_m:00\n");
    }

    #####
    # cpu_time was in minutes. Converting to SGE time format (h:m:s)
    #
    if($cpu_time != 0)
    {
        $cpu_m = $cpu_time % 60;
        $cpu_h = ( $cpu_time - $cpu_m ) / 60;

        $self->log("Using max CPU time (h:m:s) of $cpu_h:$cpu_m:00");
        $sge_job_script->print("#\$ -l h_cpu=$cpu_h:$cpu_m:00\n");
    }


    #####
    # RSL attribute for max_memory is given in Mb
    #
    $max_memory = $description->max_memory();
    if($max_memory != 0)
    {
        $self->log("Total max memory flag is set to $max_memory Mb");
        $sge_job_script->print("#\$ -l h_data=$max_memory" . "M\n");
    }



    #####
    # Where to write output and error?
    #
    $sge_job_script->print("#\$ -o " . $description->stdout() . "\n");
    $sge_job_script->print("#\$ -e " . $description->stderr() . "\n");


    #####
    # Constructing the environment variable
    #

    #$library_vars{LD_LIBRARY_PATH} = 0;
    #if($Config{osname} eq 'irix')
    #{
    #    $library_vars{LD_LIBRARYN32_PATH} = 0;
    #    $library_vars{LD_LIBRARY64_PATH} = 0;
    #}

    foreach my $tuple ($description->environment())
    {
        if(!ref($tuple) || scalar(@$tuple) != 2)
        {
            return Globus::GRAM::Error::RSL_ENVIRONMENT();
        }
        #if(exists($library_vars{$tuple->[0]}))
        #{
        #    $tuple->[1] .= ":$library_string";
        #    $library_vars{$tuple->[0]} = 1;
        #}

        push(@new_env, $tuple->[0] . "=" . '"' . $tuple->[1] . '"');

        $tuple->[0] =~ s/\\/\\\\/g;
       	$tuple->[0] =~ s/\$/\\\$/g;
        $tuple->[0] =~ s/"/\\\"/g;
        $tuple->[0] =~ s/`/\\\`/g;

        $tuple->[1] =~ s/\\/\\\\/g;
        $tuple->[1] =~ s/\$/\\\$/g;
        $tuple->[1] =~ s/"/\\\"/g;
        $tuple->[1] =~ s/`/\\\`/g;


        #####
        # Special treatment for GRD_PE or SGE_PE.
        # If jobType is mpi, this can conflict with the default PE.
        #   In that case, we override the default PE. Please note, that
        #   this can be overriden by RSL attribute parallel_envirnment!
        #
        if (($tuple->[0] eq "GRD_PE") || ($tuple->[0] eq "SGE_PE"))
        {
            if($description->jobtype() eq "mpi" || 
               $description->jobtype() eq "multiple")
            {
                $mpi_pe = $tuple->[1];
            }
        }
        else
        {
            $rsh_env .= $tuple->[0] . "=" . '"' . $tuple->[1] . '"'
                     .  "; export " . $tuple->[0] . ";\n";

        }

    }

    #foreach (keys %library_vars)
    #{
    #    if($library_vars{$_} == 0)
    #    {
    #        push(@new_env, $_ . "=" . $library_path);
    #        $rsh_env .= $_ . "=" . $library_path
    #                 .  "; export " . $_ . ";\n";
    #    }
    #}

    $sge_job_script->print("#\$ -v " . join(',', @new_env) . "\n");
 
    #####
    # Transforing arguments
    #
    @arguments = $description->arguments();

    foreach(@arguments)
    {
        if(ref($_))
	{
            return Globus::GRAM::Error::RSL_ARGUMENTS;
	}
    }
    if(defined($arguments[0]))
    {
        foreach(@arguments)
        {
            $self->log("Transforming argument \"$_\"");
            $_ =~ s/\\/\\\\/g;
            $_ =~ s/\$/\\\$/g;
            $_ =~ s/"/\\\"/g;
            $_ =~ s/`/\\\`/g;
            $self->log("Transformed to \"$_\"");

            $args .= '"' . $_ . '" ';
        }
    }
    else
    {
        $args = '';
    }


    #####
    # Determining job request type.
    #
    $self->log("Determining job type");
    $self->log("  Job is of type " . $description->jobtype());
    if($description->jobtype() eq "mpi" || 
       $description->jobtype() eq "multiple")
    {
        #####
        # Check if RSL attribute parallel_environment is provided
        #
        if($description->parallel_environment())
        {
            $mpi_pe = $description->parallel_environment();
        }

       	if(!$mpi_pe || $mpi_pe eq "NONE"){
            $self->log("ERROR: Parallel Environment (PE) failure!");
            $self->log("  MPI/multiple job was submitted, but no PE set");
            $self->log("  by neither user nor administrator");
            return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
        }
        else
        {
            $self->log("  PE is $mpi_pe");
            $sge_job_script->print("#\$ -pe $mpi_pe "
                                   . $description->count() . "\n");
        }

        #####
        # Load SGE settings
        #
        $sge_job_script->print(". $SGE_ROOT/$SGE_CELL/common/settings.sh\n");

        $sge_job_script->print("# Change to directory requested by user\n");
        $sge_job_script->print("cd " . $description->directory() . "\n");

        if($description->jobtype() eq "mpi")
        {
        #####
        # It's MPI job
        #

        if (($sun_mprun eq "no") && ($mpirun eq "no"))
        {
            return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
        }
        elsif ($sun_mprun ne "no")
        {
            #####
            # Using Sun's MPI.
            #
            $sge_job_script->print("$sun_mprun -np ". $description->count() ." "
                                   . $description->executable() . " $args < "
                                   . $description->stdin() . "\n");
        }
        else
        {
            #####
            # Using non-Sun's MPI.
            #
            $sge_job_script->print("P4_RSHCOMMAND=\$TMPDIR/rsh; "
                                   . "export P4_RSHCOMMAND\n");
            $sge_job_script->print("$mpirun -np ". $description->count()
                                   . " -machinefile \$TMPDIR/machines "
                                   . $description->executable() . " $args < "
                                   . $description->stdin() . "\n");
        }
        }
        else
        {
        #####
        # It's a multiple job
        #

        my $cmd_script_name;
        my $cmd_script;
        my $count = $description->count();
        my $stdin = $description->stdin();

        $cmd_script_name = $self->job_dir() . '/scheduler_sge_cmd_script';
        $cmd_script = new IO::File($cmd_script_name, '>');
        $cmd_script->print("#!/bin/sh\n");
        $cmd_script->print("$rsh_env\n");
        $cmd_script->print("cd " . $description->directory() . "\n");
        $cmd_script->print($description->executable() . " $args\n");
        $cmd_script->close();

        $self->log("  forking multiple requests");
        $sge_job_script->print(<<EOF);
        hosts=\`$cat \$TMPDIR/machines 2>/dev/null\`;
        if test -z "\$hosts"; then
            hosts=\$HOSTNAME
        fi
        counter=0
        while test \$counter -lt $count; do
            for host in \$hosts; do
                if test \$counter -lt $count; then
                    \$TMPDIR/rsh \$host "/bin/sh $cmd_script_name" < $stdin &
                    counter=\`expr \$counter + 1\`
                else
                    break
                fi
            done
        done
        wait
EOF
        }
    }
    else
    {
        #####
        # Load SGE settings
        #
        $sge_job_script->print(". $SGE_ROOT/$SGE_CELL/common/settings.sh\n");

        $sge_job_script->print("# Change to directory requested by user\n");
        $sge_job_script->print("cd " . $description->directory() . "\n");

        #####
        # Single execution job
        #
        $sge_job_script->print($description->executable() . " $args < "
                               . $description->stdin() . "\n");
    }

    #####
    # SGE job script is successfully built! :-)
    #
    $self->log("SGE job script successfully built! :-)");
    $sge_job_script->close();


    #####
    # Submitting a job
    #
    $self->log("Submitting a job");
    # [dwm] Set SGE_ROOT in the environment; environment now appears to get
    #       emptied in GT3.
    $ENV{"SGE_ROOT"} = $SGE_ROOT;
    $ENV{"SGE_CELL"} = $SGE_CELL;
    $ENV{"SGE_QMASTER_PORT"} = $SGE_QMASTER_PORT;
    chomp($job_id = `$qsub $sge_job_script_name`);

    if($? == 0)
    {
        $self->log("  successfully submitted");

        # get job ID
        $job_id = (split(/\s+/, $job_id))[2];

        # in the case we used job arrays
        if ($job_id =~ m/\./)
        {
            $job_id = $`;
        }

	return {JOB_ID => $job_id,
	        JOB_STATE => Globus::GRAM::JobState::PENDING };
    }
    else
    {
      $self->log("  ERROR: job submission failed");
      if ($description->project())
      {
        $self->log("  check if the project specified does exist");
      }
    }

    #####
    # If we reach this - invalid script response
    #
    return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}


#########################################################################
#
# POLL
#
########################################################################
sub poll
{
    my $self = shift;
    my $description = $self->{JobDescription};
    my $job_id = $description->jobid();
    my $state;

    $self->log("polling job $job_id");

    # [dwm]  Replacement state checking code.
    $ENV{"SGE_ROOT"} = $SGE_ROOT;
    $ENV{"SGE_CELL"} = $SGE_CELL;
    $ENV{"SGE_QMASTER_PORT"} = $SGE_QMASTER_PORT;
    
    # Get first line matching job id
    $_ = (grep(/^\s+$job_id\s/, `$qstat 2> /dev/null`))[0];
 
    if ($_ eq "")
    {
      # Job no longer exists in SGE job manager.  It must have finished.
      $self->log("Job $job_id has completed.");
      $state = Globus::GRAM::JobState::DONE;

      $self->log("Returning job success.");
    }
    else
    {
	# Get 5th field (status)
	$_ = (split(/\s+/))[5];
	
	if (/q|h|w|R/) {
 	  $self->log("  Job is still queued for execution.");
	  $state = Globus::GRAM::JobState::PENDING;
	}
	elsif(/s|S|T/) {
	  $self->log("  Job is suspended.");
	  $state = Globus::GRAM::JobState::SUSPENDED;
	}
	elsif(/t|r/) {
	  $self->log("  Job is running.");
	  $state = Globus::GRAM::JobState::ACTIVE;
	}
	else { 
	  $self->log("  Job $job_id has failed!");
	  $state = Globus::GRAM::JobState::FAILED;
	}
    }

    return {JOB_STATE => $state};
}



#########################################################################
#
# CANCEL
#
########################################################################
sub cancel
{
    my $self = shift;
    my $description = $self->{JobDescription};
    my $job_id = $description->jobid();

    $self->log("cancel job $job_id");

    $ENV{"SGE_ROOT"} = $SGE_ROOT;
    $ENV{"SGE_CELL"} = $SGE_CELL;

    system("$qdel $job_id >/dev/null 2>/dev/null");

    if($? == 0)
    {
      return { JOB_STATE => Globus::GRAM::JobState::FAILED }
    }

    return Globus::GRAM::Error::JOB_CANCEL_FAILED();
}

1;
