[ 
https://issues.apache.org/jira/browse/METRON-1614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546915#comment-16546915
 ] 

ASF GitHub Bot commented on METRON-1614:
----------------------------------------

Github user merrimanr commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1108#discussion_r203110747
  
    --- Diff: 
metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
 ---
    @@ -230,69 +274,77 @@ protected void reduce(LongWritable key, 
Iterable<BytesWritable> values, Context
             , fs
             , filterImpl
         );
    -    if (sync) {
    -      job.waitForCompletion(true);
    -    } else {
    -      job.submit();
    -    }
    +    mrJob.submit();
    +    jobState = State.RUNNING;
    +    startJobStatusTimerThread(statusInterval);
         return this;
       }
     
    -  /**
    -   * Returns a lazily-read Iterable over a set of sequence files
    -   */
    -  private SequenceFileIterable readResults(Path outputPath, Configuration 
config, FileSystem fs) throws IOException {
    -    List<Path> files = new ArrayList<>();
    -    for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, 
false); it.hasNext(); ) {
    -      Path p = it.next().getPath();
    -      if (p.getName().equals("_SUCCESS")) {
    -        fs.delete(p, false);
    -        continue;
    +  private void startJobStatusTimerThread(long interval) {
    +    timer = new Timer();
    +    timer.scheduleAtFixedRate(new TimerTask() {
    +      @Override
    +      public void run() {
    +        try {
    +          synchronized (jobState) {
    +            if (jobState == State.RUNNING) {
    +              if (mrJob.isComplete()) {
    +                switch (mrJob.getStatus().getState()) {
    +                  case SUCCEEDED:
    +                    jobState = State.FINALIZING;
    +                    if (setFinalResults(finalizer, configuration)) {
    +                      jobState = State.SUCCEEDED;
    +                    } else {
    +                      jobState = State.FAILED;
    +                    }
    +                    break;
    +                  case FAILED:
    +                    jobState = State.FAILED;
    +                    break;
    +                  case KILLED:
    +                    jobState = State.KILLED;
    +                    break;
    +                }
    +              }
    +              cancel(); // be gone, ye!
    --- End diff --
    
    This is incorrect.  Putting cancel() here will cancel the timer after the 
first run.  This call should be inside the previous if statement, after the 
switch statement.


> Create job status abstraction
> -----------------------------
>
>                 Key: METRON-1614
>                 URL: https://issues.apache.org/jira/browse/METRON-1614
>             Project: Metron
>          Issue Type: Sub-task
>            Reporter: Ryan Merriman
>            Assignee: Michael Miklavcic
>            Priority: Major
>
> It is possible to use different job engines such as MR or Spark. There should 
> be an abstraction that allows us to track status independent of the 
> underlying job engine. Initially we will use YARN/MR to run pcap query jobs. 
> We will also need a way to persist this information.
> Pcap job submission should be asynchronous. Some kind of id should be 
> returned upon successful job submission rather than blocking and waiting on 
> the job to complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to