This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new bdb6f15 fixes #495 use file len cache for summary data (#741) bdb6f15 is described below commit bdb6f15a488cec871b6c6ebbe7820bc13c9153e5 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Nov 13 11:49:05 2018 -0500 fixes #495 use file len cache for summary data (#741) --- .../org/apache/accumulo/core/client/AccumuloClient.java | 2 -- .../java/org/apache/accumulo/core/summary/Gatherer.java | 13 ++++++++----- .../org/apache/accumulo/core/summary/SummaryReader.java | 9 ++++++--- .../main/java/org/apache/accumulo/tserver/TabletServer.java | 5 ++++- .../accumulo/tserver/TabletServerResourceManager.java | 10 ++++++++-- .../accumulo/tserver/compaction/MajorCompactionRequest.java | 11 +++++++---- .../java/org/apache/accumulo/tserver/tablet/Tablet.java | 4 +++- 7 files changed, 36 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java index 2955972..5371830 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java @@ -35,8 +35,6 @@ import org.apache.accumulo.core.security.Authorizations; * Supports fluent API for creation. Various options can be provided to {@link Accumulo#newClient()} * and when finished a call to build() will return the AccumuloClient object. For example: * - * <p> - * * <pre> * <code> * try (AccumuloClient client = Accumulo.newClient() diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java index 029e86b..2d94d39 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java @@ -82,6 +82,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; @@ -97,7 +98,7 @@ import com.google.common.hash.Hashing; * execute {@link #processPartition(ExecutorService, int, int)} * <li>{@link #processPartition(ExecutorService, int, int)} will make RPC calls to multiple tserver * to remotely execute - * <li>{@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, ExecutorService)} + * <li>{@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, Cache, ExecutorService)} * </ol> */ public class Gatherer { @@ -508,12 +509,12 @@ public class Gatherer { */ public Future<SummaryCollection> processFiles(FileSystemResolver volMgr, Map<String,List<TRowRange>> files, BlockCache summaryCache, BlockCache indexCache, - ExecutorService srp) { + Cache<String,Long> fileLenCache, ExecutorService srp) { List<CompletableFuture<SummaryCollection>> futures = new ArrayList<>(); for (Entry<String,List<TRowRange>> entry : files.entrySet()) { futures.add(CompletableFuture.supplyAsync(() -> { List<RowRange> rrl = Lists.transform(entry.getValue(), RowRange::new); - return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache); + return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache, fileLenCache); }, srp)); } @@ -664,10 +665,12 @@ public class Gatherer { } private SummaryCollection getSummaries(FileSystemResolver volMgr, String file, - List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache) { + List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache, + Cache<String,Long> fileLenCache) { Path path = new Path(file); Configuration conf = CachedConfiguration.getInstance(); return SummaryReader.load(volMgr.get(path), conf, ctx.getConfiguration(), factory, path, - summarySelector, summaryCache, indexCache, cryptoService).getSummaries(ranges); + summarySelector, summaryCache, indexCache, fileLenCache, cryptoService) + .getSummaries(ranges); } } diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index 668de1e..ed71059 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -45,6 +45,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.WritableUtils; +import com.google.common.cache.Cache; + public class SummaryReader { private interface BlockReader { @@ -185,15 +187,16 @@ public class SummaryReader { public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration aConf, SummarizerFactory factory, Path file, Predicate<SummarizerConfiguration> summarySelector, - BlockCache summaryCache, BlockCache indexCache, CryptoService cryptoService) { + BlockCache summaryCache, BlockCache indexCache, Cache<String,Long> fileLenCache, + CryptoService cryptoService) { CachableBlockFile.Reader bcReader = null; try { // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when // only summary data is wanted. CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache); - bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf, - cryptoService); + bcReader = new CachableBlockFile.Reader(fs, file, conf, fileLenCache, null, compositeCache, + null, aConf, cryptoService); return load(bcReader, summarySelector, factory); } catch (FileNotFoundException fne) { SummaryReader sr = new SummaryReader(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 9a2b1ff..4816697 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -275,6 +275,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; public class TabletServer implements Runnable { @@ -2118,9 +2119,11 @@ public class TabletServer implements Runnable { .getTableConfiguration(Table.ID.of(request.getTableId())); BlockCache summaryCache = resourceManager.getSummaryCache(); BlockCache indexCache = resourceManager.getIndexCache(); + Cache<String,Long> fileLenCache = resourceManager.getFileLenCache(); FileSystemResolver volMgr = p -> fs.getVolumeByPath(p).getFileSystem(); Future<SummaryCollection> future = new Gatherer(context, request, tableCfg, - context.getCryptoService()).processFiles(volMgr, files, summaryCache, indexCache, srp); + context.getCryptoService()).processFiles(volMgr, files, summaryCache, indexCache, + fileLenCache, srp); return startSummaryOperation(credentials, future); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 7e8f891..9afba64 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -135,6 +135,8 @@ public class TabletServerResourceManager { private final ServerConfigurationFactory conf; private final ServerContext context; + private Cache<String,Long> fileLenCache; + private ExecutorService addEs(String name, ExecutorService tp) { if (threadPools.containsKey(name)) { throw new IllegalArgumentException( @@ -408,8 +410,8 @@ public class TabletServerResourceManager { int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES); - Cache<String,Long> fileLenCache = CacheBuilder.newBuilder() - .maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build(); + fileLenCache = CacheBuilder.newBuilder().maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)) + .build(); fileManager = new FileManager(tserver.getContext(), fs, maxOpenFiles, fileLenCache, _dCache, _iCache); @@ -1006,6 +1008,10 @@ public class TabletServerResourceManager { return _sCache; } + public Cache<String,Long> getFileLenCache() { + return fileLenCache; + } + public ExecutorService getSummaryRetrievalExecutor() { return summaryRetrievalPool; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java index 8b1ba08..9221a22 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java @@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; /** * Information that can be used to determine how a tablet is to be major compacted, if needed. @@ -65,10 +66,11 @@ public class MajorCompactionRequest implements Cloneable { private final BlockCache summaryCache; private Map<FileRef,DataFileValue> files; private final ServerContext context; + private final Cache<String,Long> fileLenCache; public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason, VolumeManager manager, AccumuloConfiguration tabletConfig, BlockCache summaryCache, - BlockCache indexCache, ServerContext context) { + BlockCache indexCache, Cache<String,Long> fileLenCache, ServerContext context) { this.extent = extent; this.reason = reason; this.volumeManager = manager; @@ -76,17 +78,18 @@ public class MajorCompactionRequest implements Cloneable { this.files = Collections.emptyMap(); this.summaryCache = summaryCache; this.indexCache = indexCache; + this.fileLenCache = fileLenCache; this.context = context; } public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason, AccumuloConfiguration tabletConfig, ServerContext context) { - this(extent, reason, null, tabletConfig, null, null, context); + this(extent, reason, null, tabletConfig, null, null, null, context); } public MajorCompactionRequest(MajorCompactionRequest mcr) { this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig, mcr.summaryCache, - mcr.indexCache, mcr.context); + mcr.indexCache, mcr.fileLenCache, mcr.context); // know this is already unmodifiable, no need to wrap again this.files = mcr.files; } @@ -155,7 +158,7 @@ public class MajorCompactionRequest implements Cloneable { Configuration conf = CachedConfiguration.getInstance(); SummaryCollection fsc = SummaryReader .load(fs, conf, tableConfig, factory, file.path(), summarySelector, summaryCache, - indexCache, context.getCryptoService()) + indexCache, fileLenCache, context.getCryptoService()) .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent))); sc.merge(fsc, factory); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 914f7f1..507ebdb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1823,8 +1823,10 @@ public class Tablet implements TabletCommitter { if (strategy != null) { BlockCache sc = tabletResources.getTabletServerResourceManager().getSummaryCache(); BlockCache ic = tabletResources.getTabletServerResourceManager().getIndexCache(); + Cache<String,Long> fileLenCache = tabletResources.getTabletServerResourceManager() + .getFileLenCache(); MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, - getTabletServer().getFileSystem(), tableConfiguration, sc, ic, context); + getTabletServer().getFileSystem(), tableConfiguration, sc, ic, fileLenCache, context); request.setFiles(getDatafileManager().getDatafileSizes()); strategy.gatherInformation(request); }