keith-turner commented on code in PR #5399: URL: https://github.com/apache/accumulo/pull/5399#discussion_r1992328131
########## server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java: ########## @@ -283,42 +286,77 @@ private void removeFile(ExecutorService deleteThreadPool, Path path, AtomicLong } catch (FileNotFoundException ex) { // ignored } catch (IOException ex) { - log.error("Unable to delete wal {}", path, ex); + log.error("Unable to delete {}", path, ex); } }); } private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) { final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS); + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + + final Map<Path,Future<?>> futures = new HashMap<>(collection.size()); final AtomicLong counter = new AtomicLong(); for (Pair<WalState,Path> stateFile : collection) { Path path = stateFile.getSecond(); - removeFile(deleteThreadPool, path, counter, - "Removing " + stateFile.getFirst() + " WAL " + path); + futures.put(path, removeFile(deleteThreadPool, path, counter, + "Removing " + stateFile.getFirst() + " WAL " + path)); } + while (!futures.isEmpty()) { + Iterator<Entry<Path,Future<?>>> iter = futures.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Path,Future<?>> f = iter.next(); + if (f.getValue().isDone()) { + try { + iter.remove(); + f.getValue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Uncaught exception deleting wal file" + f.getKey(), e); + } + } + } + } deleteThreadPool.shutdown(); try { while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { // empty } } catch (InterruptedException e1) { log.error("{}", e1.getMessage(), e1); } + status.currentLog.deleted += counter.get(); return counter.get(); } private long removeFiles(Collection<Path> values) { + final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(context.getConfiguration(), Property.GC_DELETE_THREADS); + .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); + final Map<Path,Future<?>> futures = new HashMap<>(values.size()); final AtomicLong counter = new AtomicLong(); for (Path path : values) { - removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path); + futures.put(path, + removeFile(deleteThreadPool, path, counter, "Removing recovery log " + path)); } + while (!futures.isEmpty()) { Review Comment: The get() method in a future waits until its done or failed, so should be able to loop through all futures once calling get. ```java futures.forEach((path,future)->{ try { future.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Uncaught exception deleting recovery log file" + path, e); } }); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org