Repository: giraph Updated Branches: refs/heads/trunk 28cbe037c -> b51ecd27c
GIRAPH-1085: Add InMemoryDataAccessor Summary: When we deal with graphs which have a lot of vertices with very little total data associated with them (values + edges) we start experiencing memory problems because of too many objects created, since every vertex has multiple objects associated with it. To solve this problem, we should have a serialized partition representation (current ByteArrayPartition just keeps byte[] per vertex, not per partition). We can leverage the out-of-core infrastructure and just add data accessor which won't be backed by disk but in memory buffers. Test Plan: Successfully ran a job which was failing without this. Differential Revision: https://reviews.facebook.net/D60435 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b51ecd27 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b51ecd27 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b51ecd27 Branch: refs/heads/trunk Commit: b51ecd27cccc520764c9ae53cabcb61d67d46d15 Parents: 28cbe03 Author: Maja Kabiljo <[email protected]> Authored: Wed Jul 6 14:57:33 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Jul 11 10:28:14 2016 -0700 ---------------------------------------------------------------------- .../giraph/ooc/data/DiskBackedDataStore.java | 3 +- .../ooc/persistence/InMemoryDataAccessor.java | 158 +++++++++++++++++++ .../ooc/persistence/OutOfCoreDataAccessor.java | 3 +- .../apache/giraph/utils/io/BigDataOutput.java | 15 ++ 4 files changed, 177 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/b51ecd27/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java index e9ab167..c8da9a0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java @@ -241,8 +241,9 @@ public abstract class DiskBackedDataStore<T> { index.addIndex(DataIndex.TypeIndexEntry.BUFFER); OutOfCoreDataAccessor.DataInputWrapper inputWrapper = oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy()); + DataInput dataInput = inputWrapper.getDataInput(); for (int i = 0; i < numBuffers; ++i) { - T entry = readNextEntry(inputWrapper.getDataInput()); + T entry = readNextEntry(dataInput); addEntryToInMemoryPartitionData(partitionId, entry); } numBytes += inputWrapper.finalizeInput(true); http://git-wip-us.apache.org/repos/asf/giraph/blob/b51ecd27/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java new file mode 100644 index 0000000..4eca0f1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/InMemoryDataAccessor.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.persistence; + +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.io.BigDataInput; +import org.apache.giraph.utils.io.BigDataOutput; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of data accessor which keeps all the data serialized but in + * memory. Useful to keep the number of used objects under control. + * + * TODO currently doesn't reuse any of the byte arrays so could cause more GCs + */ +public class InMemoryDataAccessor implements OutOfCoreDataAccessor { + /** Configuration */ + private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf; + /** DataInputOutput for each DataIndex used */ + private final ConcurrentHashMap<DataIndex, BigDataOutput> data; + + /** + * Constructor + * + * @param conf Configuration + */ + public InMemoryDataAccessor( + ImmutableClassesGiraphConfiguration<?, ?, ?> conf) { + this.conf = conf; + data = new ConcurrentHashMap<>(); + } + + @Override + public void initialize() { + // No-op + } + + @Override + public void shutdown() { + // No-op + } + + @Override + public int getNumAccessorThreads() { + return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf); + } + + @Override + public DataInputWrapper prepareInput(int threadId, + DataIndex index) throws IOException { + return new InMemoryDataInputWrapper( + new BigDataInput(data.get(index)), index); + } + + @Override + public DataOutputWrapper prepareOutput(int threadId, + DataIndex index, boolean shouldAppend) throws IOException { + // Don't need to worry about synchronization here since only one thread + // can deal with one index + BigDataOutput output = data.get(index); + if (output == null || !shouldAppend) { + output = new BigDataOutput(conf); + data.put(index, output); + } + return new InMemoryDataOutputWrapper(output); + } + + @Override + public boolean dataExist(int threadId, DataIndex index) { + return data.containsKey(index); + } + + /** + * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor} + */ + public class InMemoryDataOutputWrapper implements DataOutputWrapper { + /** Output to write data to */ + private final BigDataOutput output; + /** Size of output at the moment it was created */ + private final long initialSize; + + /** + * Constructor + * + * @param output Output to write data to + */ + public InMemoryDataOutputWrapper(BigDataOutput output) { + this.output = output; + initialSize = output.getSize(); + } + + @Override + public DataOutput getDataOutput() { + return output; + } + + @Override + public long finalizeOutput() { + return output.getSize() - initialSize; + } + } + + /** + * {@link DataInputWrapper} implementation for {@link InMemoryDataAccessor} + */ + public class InMemoryDataInputWrapper implements DataInputWrapper { + /** Input to read data from */ + private final BigDataInput input; + /** DataIndex which this wrapper belongs to */ + private final DataIndex index; + + /** + * Constructor + * + * @param input Input to read data from + * @param index DataIndex which this wrapper belongs to + */ + public InMemoryDataInputWrapper( + BigDataInput input, DataIndex index) { + this.input = input; + this.index = index; + } + + @Override + public DataInput getDataInput() { + return input; + } + + @Override + public long finalizeInput(boolean deleteOnClose) { + if (deleteOnClose) { + data.remove(index); + } + return input.getPos(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b51ecd27/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java index d4ddc62..cecb0f3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java @@ -82,7 +82,8 @@ public interface OutOfCoreDataAccessor { /** Interface to wrap <code>DataInput</code> */ interface DataInputWrapper { /** - * @return the <code>DataInput</code> + * @return the <code>DataInput</code>, should return the same instance + * every time it's called (not start from the beginning) */ DataInput getDataInput(); http://git-wip-us.apache.org/repos/asf/giraph/blob/b51ecd27/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java index c0fff60..9e84ebc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java @@ -125,6 +125,21 @@ public class BigDataOutput implements DataOutput, Writable { return conf; } + /** + * Get number of bytes written to this data output + * + * @return Size in bytes + */ + public long getSize() { + long size = currentDataOutput.getPos(); + if (dataOutputs != null) { + for (ExtendedDataOutput dataOutput : dataOutputs) { + size += dataOutput.getPos(); + } + } + return size; + } + @Override public void write(int b) throws IOException { getDataOutputToWriteTo().write(b);
