[ https://issues.apache.org/jira/browse/METRON-1614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546915#comment-16546915 ]
ASF GitHub Bot commented on METRON-1614: ---------------------------------------- Github user merrimanr commented on a diff in the pull request: https://github.com/apache/metron/pull/1108#discussion_r203110747 --- Diff: metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java --- @@ -230,69 +274,77 @@ protected void reduce(LongWritable key, Iterable<BytesWritable> values, Context , fs , filterImpl ); - if (sync) { - job.waitForCompletion(true); - } else { - job.submit(); - } + mrJob.submit(); + jobState = State.RUNNING; + startJobStatusTimerThread(statusInterval); return this; } - /** - * Returns a lazily-read Iterable over a set of sequence files - */ - private SequenceFileIterable readResults(Path outputPath, Configuration config, FileSystem fs) throws IOException { - List<Path> files = new ArrayList<>(); - for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, false); it.hasNext(); ) { - Path p = it.next().getPath(); - if (p.getName().equals("_SUCCESS")) { - fs.delete(p, false); - continue; + private void startJobStatusTimerThread(long interval) { + timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + synchronized (jobState) { + if (jobState == State.RUNNING) { + if (mrJob.isComplete()) { + switch (mrJob.getStatus().getState()) { + case SUCCEEDED: + jobState = State.FINALIZING; + if (setFinalResults(finalizer, configuration)) { + jobState = State.SUCCEEDED; + } else { + jobState = State.FAILED; + } + break; + case FAILED: + jobState = State.FAILED; + break; + case KILLED: + jobState = State.KILLED; + break; + } + } + cancel(); // be gone, ye! --- End diff -- This is incorrect. Putting cancel() here will cancel the timer after the first run. This call should be inside the previous if statement, after the switch statement. > Create job status abstraction > ----------------------------- > > Key: METRON-1614 > URL: https://issues.apache.org/jira/browse/METRON-1614 > Project: Metron > Issue Type: Sub-task > Reporter: Ryan Merriman > Assignee: Michael Miklavcic > Priority: Major > > It is possible to use different job engines such as MR or Spark. There should > be an abstraction that allows us to track status independent of the > underlying job engine. Initially we will use YARN/MR to run pcap query jobs. > We will also need a way to persist this information. > Pcap job submission should be asynchronous. Some kind of id should be > returned upon successful job submission rather than blocking and waiting on > the job to complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005)