[ 
https://issues.apache.org/jira/browse/METRON-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578573#comment-16578573
 ] 

ASF GitHub Bot commented on METRON-1732:
----------------------------------------

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1157#discussion_r209671313
  
    --- Diff: 
metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
 ---
    @@ -99,10 +104,55 @@ protected PcapResultsWriter getResultsWriter() {
             LOG.warn("Unable to cleanup files in HDFS", e);
           }
         }
    +    LOG.info("Done finalizing results");
         return new PcapPages(outFiles);
       }
     
    -  protected abstract void write(PcapResultsWriter resultsWriter, 
Configuration hadoopConfig, List<byte[]> data, Path outputPath) throws 
IOException;
    +  /**
    +   * Figure out how many threads to use in the thread pool. If it's a 
string and ends with "C",
    +   * then strip the C and treat it as an integral multiple of the number 
of cores.  If it's a
    +   * string and does not end with a C, then treat it as a number in string 
form.
    +   */
    +  private static int getNumThreads(String numThreads) {
    +    String numThreadsStr = ((String) numThreads).trim().toUpperCase();
    +    if (numThreadsStr.endsWith("C")) {
    +      Integer factor = Integer.parseInt(numThreadsStr.replace("C", ""));
    +      return factor * Runtime.getRuntime().availableProcessors();
    +    } else {
    +      return Integer.parseInt(numThreadsStr);
    +    }
    +  }
    +
    +  protected List<Path> writeParallel(Configuration hadoopConfig, Map<Path, 
List<byte[]>> toWrite,
    +      int parallelism) throws IOException {
    +    List<Path> outFiles = Collections.synchronizedList(new ArrayList<>());
    +    ForkJoinPool tp = new ForkJoinPool(parallelism);
    +    try {
    +      tp.submit(() -> {
    +        toWrite.entrySet().parallelStream().forEach(e -> {
    +          try {
    +            Path path = e.getKey();
    +            List<byte[]> data = e.getValue();
    +            if (data.size() > 0) {
    +              write(getResultsWriter(), hadoopConfig, data, path);
    +              outFiles.add(path);
    +            }
    +          } catch (IOException ioe) {
    +            throw new RuntimeException("Failed to write results", ioe);
    --- End diff --
    
    Can we add the path that failed to write to the exception message?


> Fix job status liveness bug and parallelize finalizer file writing
> ------------------------------------------------------------------
>
>                 Key: METRON-1732
>                 URL: https://issues.apache.org/jira/browse/METRON-1732
>             Project: Metron
>          Issue Type: Sub-task
>            Reporter: Michael Miklavcic
>            Assignee: Michael Miklavcic
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to