This is an automated email from the ASF dual-hosted git repository. asdf2014 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 7702005 Use Closer instead of List<Closeable> (#8235) 7702005 is described below commit 7702005f8f8a8f3a81df44d3f7a7957cc58e7418 Author: Fokko Driesprong <fo...@apache.org> AuthorDate: Wed Aug 7 08:29:03 2019 +0200 Use Closer instead of List<Closeable> (#8235) * Use Closer instead of List<Closeable> * Process comments * Catch an Exception instead * Removed unused import --- .../apache/druid/java/util/common/io/Closer.java | 17 ++++++++++++--- .../epinephelinae/GroupByMergingQueryRunnerV2.java | 24 +++++++++++++--------- .../groupby/epinephelinae/GroupByRowProcessor.java | 17 ++++++++------- 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java b/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java index f5bfb2c..b17e52c 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java +++ b/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java @@ -108,12 +108,11 @@ public final class Closer implements Closeable } /** - * Registers the given {@code closeable} to be closed when this {@code Closer} is + * Registers the given {@code Closeable} to be closed when this {@code Closer} is * {@linkplain #close closed}. * - * @return the given {@code closeable} + * @return the given {@code Closeable} */ - // close. this word no longer has any meaning to me. public <C extends Closeable> C register(@Nullable C closeable) { if (closeable != null) { @@ -124,6 +123,18 @@ public final class Closer implements Closeable } /** + * Registers a list of {@code Closeable} to be closed when this {@code Closer} is + * {@linkplain #close closed}. + * + * @return the supplied list of {@code Closeable} + */ + public <C extends Closeable> Iterable<C> registerAll(Iterable<C> closeables) + { + closeables.forEach(this::register); + return closeables; + } + + /** * Stores the given throwable and rethrows it. It will be rethrown as is if it is an * {@code IOException}, {@code RuntimeException} or {@code Error}. Otherwise, it will be rethrown * wrapped in a {@code RuntimeException}. <b>Note:</b> Be sure to declare all of the checked diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index aa162db..398410b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -39,8 +39,8 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.AbstractPrioritizedCallable; import org.apache.druid.query.ChainedExecutionQueryRunner; @@ -58,7 +58,6 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBas import java.io.File; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.CancellationException; @@ -166,7 +165,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow> @Override public CloseableGrouperIterator<RowBasedKey, ResultRow> make() { - final List<ReferenceCountingResourceHolder> resources = new ArrayList<>(); + final Closer resources = Closer.create(); try { final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( @@ -175,7 +174,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow> ); final ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder = ReferenceCountingResourceHolder.fromCloseable(temporaryStorage); - resources.add(temporaryStorageHolder); + resources.register(temporaryStorageHolder); // If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1; @@ -185,7 +184,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow> hasTimeout, timeoutAt ); - resources.addAll(mergeBufferHolders); + resources.registerAll(mergeBufferHolders); final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder = mergeBufferHolders.get(0); final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder = numMergeBuffers == 2 ? @@ -214,7 +213,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow> final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder = ReferenceCountingResourceHolder.fromCloseable(grouper); - resources.add(grouperHolder); + resources.register(grouperHolder); ListenableFuture<List<AggregateResult>> futures = Futures.allAsList( Lists.newArrayList( @@ -280,13 +279,18 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow> return RowBasedGrouperHelper.makeGrouperIterator( grouper, query, - () -> Lists.reverse(resources).forEach(CloseQuietly::close) + resources ); } - catch (Throwable e) { + catch (Throwable t) { // Exception caught while setting up the iterator; release resources. - Lists.reverse(resources).forEach(CloseQuietly::close); - throw e; + try { + resources.close(); + } + catch (Exception ex) { + t.addSuppressed(ex); + } + throw t; } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index b33064a..f86b2f0 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -21,14 +21,13 @@ package org.apache.druid.query.groupby.epinephelinae; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; -import com.google.common.collect.Lists; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; @@ -40,8 +39,8 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -95,7 +94,7 @@ public class GroupByRowProcessor final int mergeBufferSize ) { - final List<Closeable> closeOnExit = new ArrayList<>(); + final Closer closeOnExit = Closer.create(); final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); final File temporaryStorageDirectory = new File( @@ -108,7 +107,7 @@ public class GroupByRowProcessor querySpecificConfig.getMaxOnDiskStorage() ); - closeOnExit.add(temporaryStorage); + closeOnExit.register(temporaryStorage); Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, ResultRow>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( query, @@ -120,7 +119,7 @@ public class GroupByRowProcessor public ByteBuffer get() { final ResourceHolder<ByteBuffer> mergeBufferHolder = resource.getMergeBuffer(); - closeOnExit.add(mergeBufferHolder); + closeOnExit.register(mergeBufferHolder); return mergeBufferHolder.get(); } }, @@ -130,7 +129,7 @@ public class GroupByRowProcessor ); final Grouper<RowBasedKey> grouper = pair.lhs; final Accumulator<AggregateResult, ResultRow> accumulator = pair.rhs; - closeOnExit.add(grouper); + closeOnExit.register(grouper); final AggregateResult retVal = rows.accumulate(AggregateResult.ok(), accumulator); @@ -147,9 +146,9 @@ public class GroupByRowProcessor } @Override - public void close() + public void close() throws IOException { - Lists.reverse(closeOnExit).forEach(CloseQuietly::close); + closeOnExit.close(); } }; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org