This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 5b4fe693dd Allow user to enable FSDataOutputStream.setDropBehind on majc output (#3083) 5b4fe693dd is described below commit 5b4fe693dd11d572f078620aa65e8b85726acdd5 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Dec 7 15:31:09 2022 -0500 Allow user to enable FSDataOutputStream.setDropBehind on majc output (#3083) This commit introduces a new property, table.compaction.major.output.drop.cache, that defaults to false and if true will call setDropBehind on the major compaction file output stream. --- .../org/apache/accumulo/core/conf/Property.java | 5 +++++ .../apache/accumulo/core/file/FileOperations.java | 5 +++++ .../accumulo/core/file/rfile/RFileOperations.java | 25 +++++++++++++++++++++- .../accumulo/server/compaction/FileCompactor.java | 17 +++++++++++++-- 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index b5faf85f14..4e7760c68e 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -964,6 +964,11 @@ public enum Property { "1.3.5"), TABLE_ARBITRARY_PROP_PREFIX("table.custom.", null, PropertyType.PREFIX, "Prefix to be used for user defined arbitrary properties.", "1.7.0"), + TABLE_MAJC_OUTPUT_DROP_CACHE("table.compaction.major.output.drop.cache", "false", + PropertyType.BOOLEAN, + "Setting this property to true will call" + + "FSDataOutputStream.setDropBehind(true) on the major compaction output stream.", + "2.1.1"), TABLE_MAJC_RATIO("table.compaction.major.ratio", "3", PropertyType.FRACTION, "Minimum ratio of total input size to maximum input RFile size for" + " running a major compaction. ", diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index 2a2b4aeaa5..db82b0d149 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -390,6 +390,11 @@ public abstract class FileOperations { return this; } + public WriterBuilder dropCachesBehind() { + this.dropCacheBehind(true); + return this; + } + public FileSKVWriter build() throws IOException { return openWriter(toWriterBuilderOptions(compression, outputStream, enableAccumuloStart)); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index beeeeda6ec..609c19550e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -21,6 +21,7 @@ package org.apache.accumulo.core.file.rfile; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import org.apache.accumulo.core.client.sample.Sampler; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -36,14 +37,20 @@ import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.sample.impl.SamplerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; public class RFileOperations extends FileOperations { + private static final Logger LOG = LoggerFactory.getLogger(RFileOperations.class); + private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet(); private static RFile.Reader getReader(FileOptions options) throws IOException { @@ -129,7 +136,23 @@ public class RFileOperations extends FileOperations { String file = options.getFilename(); FileSystem fs = options.getFileSystem(); - outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block); + if (options.dropCacheBehind) { + EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE); + outputStream = fs.create(new Path(file), FsPermission.getDefault(), set, bufferSize, + (short) rep, block, null); + try { + // Tell the DataNode that the file does not need to be cached in the OS page cache + outputStream.setDropBehind(Boolean.TRUE); + LOG.trace("Called setDropBehind(TRUE) for stream writing file {}", options.filename); + } catch (UnsupportedOperationException e) { + LOG.debug("setDropBehind not enabled for file: {}", options.filename); + } catch (IOException e) { + LOG.debug("IOException setting drop behind for file: {}, msg: {}", options.filename, + e.getMessage()); + } + } else { + outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block); + } } BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression, diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index e083c5aa14..a7e879dc8b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -35,12 +35,14 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileOperations.WriterBuilder; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; @@ -50,6 +52,8 @@ import org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterato import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; @@ -212,9 +216,18 @@ public class FileCompactor implements Callable<CompactionStats> { try { FileOperations fileFactory = FileOperations.getInstance(); FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath()); - mfw = fileFactory.newWriterBuilder() + + boolean dropCacheBehindMajcOutput = !RootTable.ID.equals(this.extent.tableId()) + && !MetadataTable.ID.equals(this.extent.tableId()) + && acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE); + + WriterBuilder outBuilder = fileFactory.newWriterBuilder() .forFile(outputFile.getMetaInsert(), ns, ns.getConf(), cryptoService) - .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter()).build(); + .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter()); + if (dropCacheBehindMajcOutput) { + outBuilder.dropCachesBehind(); + } + mfw = outBuilder.build(); Map<String,Set<ByteSequence>> lGroups = getLocalityGroups(acuTableConf);