code cleanup?
-------------

                 Key: HADOOP-1962
                 URL: https://issues.apache.org/jira/browse/HADOOP-1962
             Project: Hadoop
          Issue Type: Improvement
          Components: contrib/streaming
    Affects Versions: 0.14.1
            Reporter: Dick King


I would like some code placed in the released streaming, ...

This is a replacement for 
org.apache.hadoop.streaming.StreamJob.submitAndMonitorJob() :

New code:


  // Based on JobClient
  public void submitAndMonitorJob() throws IOException {

    if (jar_ != null && isLocalHadoop()) {
      // getAbs became required when shell and subvm have different working 
dirs...
      File wd = new File(".").getAbsoluteFile();
      StreamUtil.unJar(new File(jar_), wd);
    }

    // ecw - begin
    JobClient.runJob(jobConf_);
    /*
    // if jobConf_ changes must recreate a JobClient
    jc_ = new JobClient(jobConf_);
    boolean error = true;
    running_ = null;
    String lastReport = null;
    try {
      running_ = jc_.submitJob(jobConf_);
      jobId_ = running_.getJobID();

      LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));
      LOG.info("Running job: " + jobId_);
      jobInfo();

      while (!running_.isComplete()) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        running_ = jc_.getJob(jobId_);
        String report = null;
        report = " map " + Math.round(running_.mapProgress() * 100) + "%  
reduce "
          + Math.round(running_.reduceProgress() * 100) + "%";

        if (!report.equals(lastReport)) {
          LOG.info(report);
          lastReport = report;
        }
      }
      if (!running_.isSuccessful()) {
        jobInfo();
        throw new IOException("Job not Successful!");
      }
      LOG.info("Job complete: " + jobId_);
      LOG.info("Output: " + output_);
      error = false;
    } catch(FileNotFoundException fe){
      LOG.error("Error launching job , bad input path : " + fe.getMessage());
    }catch(InvalidJobConfException je){
      LOG.error("Error launching job , Invalid job conf : " + je.getMessage());
    }catch(FileAlreadyExistsException fae){
      LOG.error("Error launching job , Output path already exists : " 
                + fae.getMessage());
    }catch(IOException ioe){
      LOG.error("Error Launching job : " + ioe.getMessage());
    }
    finally {
      if (error && (running_ != null)) {
        LOG.info("killJob...");
        running_.killJob();
      }
      jc_.close();
    }
    */
    // ecw - end
  }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to