Repository: giraph Updated Branches: refs/heads/trunk b0262f8c8 -> 72562004c
GIRAPH-1086: Use pool of byte arrays with InMemoryDataAccessor Summary: Have a pool of byte arrays with InMemoryDataAccessor, to save on byte array creation and initialization. Test Plan: Improved performance of a job using InMemoryDataAccessor Differential Revision: https://reviews.facebook.net/D60621 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/72562004 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/72562004 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/72562004 Branch: refs/heads/trunk Commit: 72562004cfa5106472f61efc5d456788bbc256f9 Parents: b0262f8 Author: Maja Kabiljo <[email protected]> Authored: Mon Jul 11 11:07:18 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Wed Jul 13 10:26:46 2016 -0700 ---------------------------------------------------------------------- .../ooc/persistence/InMemoryDataAccessor.java | 100 +++++++++++++++++-- .../apache/giraph/utils/io/BigDataOutput.java | 52 +++++++--- 2 files changed, 133 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/72562004/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java index 4eca0f1..58fbcb6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java @@ -20,6 +20,8 @@ package org.apache.giraph.ooc.persistence; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.io.BigDataInput; import org.apache.giraph.utils.io.BigDataOutput; @@ -27,18 +29,20 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; /** * Implementation of data accessor which keeps all the data serialized but in * memory. Useful to keep the number of used objects under control. - * - * TODO currently doesn't reuse any of the byte arrays so could cause more GCs */ public class InMemoryDataAccessor implements OutOfCoreDataAccessor { /** Configuration */ private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf; + /** Factory for data outputs */ + private final PooledBigDataOutputFactory outputFactory; /** DataInputOutput for each DataIndex used */ - private final ConcurrentHashMap<DataIndex, BigDataOutput> data; + private final ConcurrentHashMap< + DataIndex, PooledBigDataOutputFactory.Output> data; /** * Constructor @@ -48,6 +52,7 @@ public class InMemoryDataAccessor implements OutOfCoreDataAccessor { public InMemoryDataAccessor( ImmutableClassesGiraphConfiguration<?, ?, ?> conf) { this.conf = conf; + outputFactory = new PooledBigDataOutputFactory(conf); data = new ConcurrentHashMap<>(); } @@ -78,9 +83,9 @@ public class InMemoryDataAccessor implements OutOfCoreDataAccessor { DataIndex index, boolean shouldAppend) throws IOException { // Don't need to worry about synchronization here since only one thread // can deal with one index - BigDataOutput output = data.get(index); + PooledBigDataOutputFactory.Output output = data.get(index); if (output == null || !shouldAppend) { - output = new BigDataOutput(conf); + output = outputFactory.createOutput(); data.put(index, output); } return new InMemoryDataOutputWrapper(output); @@ -94,7 +99,7 @@ public class InMemoryDataAccessor implements OutOfCoreDataAccessor { /** * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor} */ - public class InMemoryDataOutputWrapper implements DataOutputWrapper { + public static class InMemoryDataOutputWrapper implements DataOutputWrapper { /** Output to write data to */ private final BigDataOutput output; /** Size of output at the moment it was created */ @@ -150,9 +155,90 @@ public class InMemoryDataAccessor implements OutOfCoreDataAccessor { @Override public long finalizeInput(boolean deleteOnClose) { if (deleteOnClose) { - data.remove(index); + data.remove(index).returnData(); } return input.getPos(); } } + + /** + * Factory for pooled big data outputs + */ + private static class PooledBigDataOutputFactory { + /** How big pool of byte arrays to keep */ + public static final IntConfOption BYTE_ARRAY_POOL_SIZE = + new IntConfOption("giraph.inMemoryDataAccessor.poolSize", 1024, + "How big pool of byte arrays to keep"); + /** How big byte arrays to make */ + public static final IntConfOption BYTE_ARRAY_SIZE = + new IntConfOption("giraph.inMemoryDataAccessor.byteArraySize", 1 << 21, + "How big byte arrays to make"); + + /** Configuration */ + private final ImmutableClassesGiraphConfiguration conf; + /** Pool of reusable byte[] */ + private final LinkedBlockingDeque<byte[]> byteArrayPool; + /** How big byte arrays to make */ + private final int byteArraySize; + + /** + * Constructor + * + * @param conf Configuration + */ + public PooledBigDataOutputFactory( + ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + byteArrayPool = new LinkedBlockingDeque<>(BYTE_ARRAY_POOL_SIZE.get(conf)); + byteArraySize = BYTE_ARRAY_SIZE.get(conf); + } + + /** + * Create new output to write to + * + * @return Output to write to + */ + public Output createOutput() { + return new Output(conf); + } + + /** + * Implementation of BigDataOutput + */ + private class Output extends BigDataOutput { + /** + * Constructor + * + * @param conf Configuration + */ + public Output(ImmutableClassesGiraphConfiguration conf) { + super(conf); + } + + /** + * Return all data structures related to this data output. + * Can't use the same instance after this call anymore. + */ + protected void returnData() { + if (dataOutputs != null) { + for (ExtendedDataOutput dataOutput : dataOutputs) { + byteArrayPool.offer(dataOutput.getByteArray()); + } + } + byteArrayPool.offer(currentDataOutput.getByteArray()); + } + + @Override + protected ExtendedDataOutput createOutput(int size) { + byte[] data = byteArrayPool.pollLast(); + return conf.createExtendedDataOutput( + data == null ? new byte[byteArraySize] : data, 0); + } + + @Override + protected int getMaxSize() { + return byteArraySize; + } + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/72562004/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java index 9e84ebc..094e4a5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java @@ -50,11 +50,11 @@ public class BigDataOutput implements DataOutput, Writable { private static final int SIZE_DELTA = 100; /** Data output which we are currently writing to */ - private ExtendedDataOutput currentDataOutput; + protected ExtendedDataOutput currentDataOutput; /** List of filled outputs, will be null until we get a lot of data */ - private List<ExtendedDataOutput> dataOutputs; + protected List<ExtendedDataOutput> dataOutputs; /** Configuration */ - private final ImmutableClassesGiraphConfiguration conf; + protected final ImmutableClassesGiraphConfiguration conf; /** * Constructor @@ -75,7 +75,26 @@ public class BigDataOutput implements DataOutput, Writable { ImmutableClassesGiraphConfiguration conf) { this.conf = conf; dataOutputs = null; - currentDataOutput = conf.createExtendedDataOutput(initialSize); + currentDataOutput = createOutput(initialSize); + } + + /** + * Get max size for single data output + * + * @return Max size for single data output + */ + protected int getMaxSize() { + return MAX_SIZE; + } + + /** + * Create next data output + * + * @param size Size of data output to create + * @return Created data output + */ + protected ExtendedDataOutput createOutput(int size) { + return conf.createExtendedDataOutput(size); } /** @@ -85,16 +104,25 @@ public class BigDataOutput implements DataOutput, Writable { * @return DataOutput which data should be written to */ private ExtendedDataOutput getDataOutputToWriteTo() { - if (currentDataOutput.getPos() + SIZE_DELTA < MAX_SIZE) { - return currentDataOutput; - } else { + return getDataOutputToWriteTo(SIZE_DELTA); + } + + /** + * Get DataOutput which data should be written to. If current DataOutput is + * full it will create a new one. + * + * @param additionalSize How many additional bytes we need space for + * @return DataOutput which data should be written to + */ + private ExtendedDataOutput getDataOutputToWriteTo(int additionalSize) { + if (currentDataOutput.getPos() + additionalSize >= getMaxSize()) { if (dataOutputs == null) { - dataOutputs = new ArrayList<ExtendedDataOutput>(1); + dataOutputs = new ArrayList<>(1); } dataOutputs.add(currentDataOutput); - currentDataOutput = conf.createExtendedDataOutput(MAX_SIZE); - return currentDataOutput; + currentDataOutput = createOutput(getMaxSize()); } + return currentDataOutput; } /** @@ -147,12 +175,12 @@ public class BigDataOutput implements DataOutput, Writable { @Override public void write(byte[] b) throws IOException { - getDataOutputToWriteTo().write(b); + getDataOutputToWriteTo(b.length + SIZE_DELTA).write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { - getDataOutputToWriteTo().write(b, off, len); + getDataOutputToWriteTo(len + SIZE_DELTA).write(b, off, len); } @Override
