http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricLong.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricLong.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricLong.java new file mode 100644 index 0000000..ece2fad --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricLong.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.metrics; + +/** + * Aggregator over metrics with long values + */ +public final class AggregatedMetricLong extends AggregatedMetric<Long> { + /** + * Constructor + */ + public AggregatedMetricLong() { + min = new ValueWithHostname<>(Long.MAX_VALUE); + max = new ValueWithHostname<>(Long.MIN_VALUE); + sum = (long) 0; + } + + @Override + public void addItem(Long value, String hostnamePartitionId) { + if (value < min.getValue()) { + min.set(value, hostnamePartitionId); + } + if (value > max.getValue()) { + max.set(value, hostnamePartitionId); + } + sum += value; + count++; + } + + @Override + public double mean() { + return sum / (double) count; + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java index 9d8d48f..3ef7127 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java @@ -19,6 +19,8 @@ package org.apache.giraph.metrics; import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.OutOfCoreIOCallable; import org.apache.giraph.worker.BspServiceWorker; import com.google.common.collect.Maps; @@ -31,7 +33,7 @@ import java.util.Map; */ public class AggregatedMetrics { /** Mapping from name to aggregated metric */ - private Map<String, AggregatedMetric> metrics = Maps.newHashMap(); + private Map<String, AggregatedMetric<?>> metrics = Maps.newHashMap(); /** * Add value from hostname for a metric. @@ -43,9 +45,30 @@ public class AggregatedMetrics { */ public AggregatedMetrics add(String name, long value, String hostnamePartitionId) { - AggregatedMetric aggregatedMetric = metrics.get(name); + AggregatedMetricLong aggregatedMetric = + (AggregatedMetricLong) metrics.get(name); if (aggregatedMetric == null) { - aggregatedMetric = new AggregatedMetric(); + aggregatedMetric = new AggregatedMetricLong(); + metrics.put(name, aggregatedMetric); + } + aggregatedMetric.addItem(value, hostnamePartitionId); + return this; + } + + /** + * Add value from hostname for a metric. + * + * @param name String name of metric + * @param value double value to track + * @param hostnamePartitionId String host it came from + * @return this + */ + public AggregatedMetrics add(String name, double value, + String hostnamePartitionId) { + AggregatedMetricDouble aggregatedMetric = + (AggregatedMetricDouble) metrics.get(name); + if (aggregatedMetric == null) { + aggregatedMetric = new AggregatedMetricDouble(); metrics.put(name, aggregatedMetric); } aggregatedMetric.addItem(value, hostnamePartitionId); @@ -71,6 +94,12 @@ public class AggregatedMetrics { workerMetrics.getTimeToFirstMsg(), hostname); add(BspServiceWorker.TIMER_WAIT_REQUESTS, workerMetrics.getWaitRequestsTimer(), hostname); + add(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK, + workerMetrics.getBytesLoadedFromDisk(), hostname); + add(OutOfCoreIOCallable.BYTES_STORE_TO_DISK, + workerMetrics.getBytesStoredOnDisk(), hostname); + add(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY, + workerMetrics.getGraphPercentageInMemory(), hostname); return this; } @@ -89,6 +118,12 @@ public class AggregatedMetrics { get(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG); AggregatedMetric waitRequestsMicros = get( BspServiceWorker.TIMER_WAIT_REQUESTS); + AggregatedMetric bytesLoaded = + get(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK); + AggregatedMetric bytesStored = + get(OutOfCoreIOCallable.BYTES_STORE_TO_DISK); + AggregatedMetric graphInMem = + get(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY); out.println(); out.println("--- METRICS: superstep " + superstep + " ---"); @@ -97,6 +132,9 @@ public class AggregatedMetrics { printAggregatedMetric(out, "network communication time", "ms", commTime); printAggregatedMetric(out, "time to first message", "us", timeToFirstMsg); printAggregatedMetric(out, "wait requests time", "us", waitRequestsMicros); + printAggregatedMetric(out, "bytes loaded from disk", "bytes", bytesLoaded); + printAggregatedMetric(out, "bytes stored to disk", "bytes", bytesStored); + printAggregatedMetric(out, "graph in mem", "%", graphInMem); return this; } @@ -106,17 +144,17 @@ public class AggregatedMetrics { * * @param out PrintStream to write to * @param header String header to print. - * @param timeUnit String time unit of metric + * @param unit String unit of metric * @param aggregatedMetric AggregatedMetric to write */ private void printAggregatedMetric(PrintStream out, String header, - String timeUnit, + String unit, AggregatedMetric aggregatedMetric) { if (aggregatedMetric.hasData()) { out.println(header); - out.println(" mean: " + aggregatedMetric.mean() + " " + timeUnit); - printValueFromHost(out, " slowest: ", timeUnit, aggregatedMetric.max()); - printValueFromHost(out, " fastest: ", timeUnit, aggregatedMetric.min()); + out.println(" mean: " + aggregatedMetric.mean() + " " + unit); + printValueFromHost(out, " smallest: ", unit, aggregatedMetric.max()); + printValueFromHost(out, " largest: ", unit, aggregatedMetric.min()); } else { out.println(header + ": NO DATA"); } @@ -127,12 +165,12 @@ public class AggregatedMetrics { * * @param out PrintStream to write to * @param prefix String to write at beginning - * @param timeUnit String timeUnit of metric + * @param unit String unit of metric * @param vh ValueWithHostname to write */ private void printValueFromHost(PrintStream out, String prefix, - String timeUnit, ValueWithHostname vh) { - out.println(prefix + vh.getValue() + ' ' + timeUnit + + String unit, ValueWithHostname vh) { + out.println(prefix + vh.getValue() + ' ' + unit + " from " + vh.getHostname()); } @@ -151,7 +189,7 @@ public class AggregatedMetrics { * * @return Map of all the aggregated metrics. */ - public Map<String, AggregatedMetric> getAll() { + public Map<String, AggregatedMetric<?>> getAll() { return metrics; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java b/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java index 208c4c7..91a7398 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java @@ -20,27 +20,29 @@ package org.apache.giraph.metrics; /** * Pair of value with host it came from. + * + * @param <T> types of value (either long or double) */ -class ValueWithHostname { +class ValueWithHostname<T extends Number> { /** long value we're holding */ - private long value; + private T value; /** host associated with value */ private String hostname; /** * Create with initial value * - * @param value long initial value to use + * @param value initial value to use */ - public ValueWithHostname(long value) { + public ValueWithHostname(T value) { this.value = value; this.hostname = null; } /** - * @return long value + * @return value */ - public long getValue() { + public T getValue() { return value; } @@ -62,10 +64,10 @@ class ValueWithHostname { /** * Set value and partition together. - * @param value long value to use. + * @param value value to use. * @param hostname String host it came from. */ - public void set(long value, String hostname) { + public void set(T value, String hostname) { this.value = value; this.hostname = hostname; } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java index 05ec55b..219bcbd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java @@ -19,6 +19,8 @@ package org.apache.giraph.metrics; import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.OutOfCoreIOCallable; import org.apache.giraph.worker.BspServiceWorker; import org.apache.hadoop.io.Writable; @@ -28,6 +30,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; +import java.util.concurrent.TimeUnit; /** * Per-superstep metrics for a Worker. @@ -43,6 +46,14 @@ public class WorkerSuperstepMetrics implements Writable { private LongAndTimeUnit superstepTimer; /** Time spent waiting for other workers to finish */ private LongAndTimeUnit waitRequestsTimer; + /** Time spent doing GC in a superstep */ + private LongAndTimeUnit superstepGCTimer; + /** Number of bytes loaded from disk to memory in out-of-core mechanism */ + private long bytesLoadedFromDisk; + /** Number of bytes stored from memory to disk in out-of-core mechanism */ + private long bytesStoredOnDisk; + /** Percentage of graph kept in memory */ + private double graphPercentageInMemory; /** * Constructor @@ -53,6 +64,11 @@ public class WorkerSuperstepMetrics implements Writable { timeToFirstMsg = new LongAndTimeUnit(); superstepTimer = new LongAndTimeUnit(); waitRequestsTimer = new LongAndTimeUnit(); + superstepGCTimer = new LongAndTimeUnit(); + superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS); + bytesLoadedFromDisk = 0; + bytesStoredOnDisk = 0; + graphPercentageInMemory = 0; } /** @@ -66,6 +82,20 @@ public class WorkerSuperstepMetrics implements Writable { readGiraphTimer(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg); readGiraphTimer(GraphTaskManager.TIMER_SUPERSTEP_TIME, superstepTimer); readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer); + SuperstepMetricsRegistry registry = GiraphMetrics.get().perSuperstep(); + superstepGCTimer.setValue( + registry.getCounter(GraphTaskManager.TIMER_SUPERSTEP_GC_TIME).count()); + bytesLoadedFromDisk = + registry.getCounter(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK).count(); + bytesStoredOnDisk = + registry.getCounter(OutOfCoreIOCallable.BYTES_STORE_TO_DISK).count(); + Gauge<Double> gauge = + registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY); + if (gauge != null) { + graphPercentageInMemory = gauge.value(); + } else { + graphPercentageInMemory = 100; + } return this; } @@ -99,6 +129,9 @@ public class WorkerSuperstepMetrics implements Writable { out.println("--- METRICS: superstep " + superstep + " ---"); out.println(" superstep time: " + superstepTimer); out.println(" compute all partitions: " + computeAllTimer); + out.println(" time spent in gc: " + superstepGCTimer); + out.println(" bytes transferred in out-of-core: " + + (bytesLoadedFromDisk + bytesStoredOnDisk)); out.println(" network communication time: " + commTimer); out.println(" time to first message: " + timeToFirstMsg); out.println(" wait on requests time: " + waitRequestsTimer); @@ -140,6 +173,29 @@ public class WorkerSuperstepMetrics implements Writable { return waitRequestsTimer.getValue(); } + /** + * @return number of bytes loaded from disk by out-of-core mechanism (if any + * is used) + */ + public long getBytesLoadedFromDisk() { + return bytesLoadedFromDisk; + } + + /** + * @return number of bytes stored on disk by out-of-core mechanism (if any is + * used) + */ + public long getBytesStoredOnDisk() { + return bytesStoredOnDisk; + } + + /** + * @return a rough estimate of percentage of graph in memory + */ + public double getGraphPercentageInMemory() { + return graphPercentageInMemory; + } + @Override public void readFields(DataInput dataInput) throws IOException { commTimer.setValue(dataInput.readLong()); @@ -147,6 +203,9 @@ public class WorkerSuperstepMetrics implements Writable { timeToFirstMsg.setValue(dataInput.readLong()); superstepTimer.setValue(dataInput.readLong()); waitRequestsTimer.setValue(dataInput.readLong()); + bytesLoadedFromDisk = dataInput.readLong(); + bytesStoredOnDisk = dataInput.readLong(); + graphPercentageInMemory = dataInput.readDouble(); } @Override @@ -156,5 +215,8 @@ public class WorkerSuperstepMetrics implements Writable { dataOutput.writeLong(timeToFirstMsg.getValue()); dataOutput.writeLong(superstepTimer.getValue()); dataOutput.writeLong(waitRequestsTimer.getValue()); + dataOutput.writeLong(bytesLoadedFromDisk); + dataOutput.writeLong(bytesStoredOnDisk); + dataOutput.writeDouble(graphPercentageInMemory); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java deleted file mode 100644 index 9324239..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.ooc.data.MetaPartitionManager; -import org.apache.giraph.ooc.io.IOCommand; -import org.apache.giraph.ooc.io.LoadPartitionIOCommand; -import org.apache.giraph.ooc.io.StorePartitionIOCommand; -import org.apache.giraph.ooc.io.WaitIOCommand; -import org.apache.log4j.Logger; - -/** - * Out-of-core engine maintaining fixed number of partitions in memory. - */ -public class FixedOutOfCoreEngine extends OutOfCoreEngine { - /** Class logger. */ - private static final Logger LOG = - Logger.getLogger(FixedOutOfCoreEngine.class); - /** - * When getting partitions, how many milliseconds to wait if no partition was - * available in memory - */ - private static final long MSEC_TO_WAIT = 1000; - /** - * Dummy object to wait on until a partition becomes available in memory - * for processing - */ - private final Object partitionAvailable = new Object(); - - /** - * Constructor - * - * @param conf Configuration - * @param service Service worker - * @param maxPartitionsInMemory Maximum number of partitions that can be kept - * in memory - */ - public FixedOutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf, - CentralizedServiceWorker<?, ?, ?> service, - int maxPartitionsInMemory) { - super(conf, service); - this.ioScheduler = new FixedOutOfCoreIOScheduler(maxPartitionsInMemory, - numIOThreads, this, conf); - } - - @Override - public Integer getNextPartition() { - Integer partitionId; - synchronized (partitionAvailable) { - while ((partitionId = metaPartitionManager.getNextPartition()) == null) { - try { - if (LOG.isInfoEnabled()) { - LOG.info("getNextPartition: waiting until a partition becomes " + - "available!"); - } - partitionAvailable.wait(MSEC_TO_WAIT); - } catch (InterruptedException e) { - throw new IllegalStateException("getNextPartition: caught " + - "InterruptedException while waiting to retrieve a partition to " + - "process"); - } - if (jobFailed) { - throw new RuntimeException("Job Failed due to a failure in an " + - "out-of-core IO thread"); - } - } - } - if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) { - partitionId = null; - } - return partitionId; - } - - @Override - public void doneProcessingPartition(int partitionId) { - metaPartitionManager.setPartitionIsProcessed(partitionId); - // Put the partition in store IO command queue and announce this partition - // as a candidate to offload to disk. - if (LOG.isInfoEnabled()) { - LOG.info("doneProcessingPartition: processing partition " + partitionId + - " is done!"); - } - ioScheduler.addIOCommand(new StorePartitionIOCommand(this, partitionId)); - } - - @Override - public void startIteration() { - getSuperstepLock().writeLock().lock(); - metaPartitionManager.resetPartition(); - ((FixedOutOfCoreIOScheduler) ioScheduler).clearStoreCommandQueue(); - getSuperstepLock().writeLock().unlock(); - } - - @Override - public void retrievePartition(int partitionId) { - long superstep = service.getSuperstep(); - if (metaPartitionManager.isPartitionOnDisk(partitionId)) { - ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId, - superstep)); - synchronized (partitionAvailable) { - while (metaPartitionManager.isPartitionOnDisk(partitionId)) { - try { - if (LOG.isInfoEnabled()) { - LOG.info("retrievePartition: waiting until partition " + - partitionId + " becomes available"); - } - partitionAvailable.wait(); - } catch (InterruptedException e) { - throw new IllegalStateException("retrievePartition: caught " + - "InterruptedException while waiting to retrieve partition " + - partitionId); - } - } - } - } - } - - @Override - public void ioCommandCompleted(IOCommand command) { - if (command instanceof LoadPartitionIOCommand || - command instanceof WaitIOCommand) { - // Notifying compute threads who are waiting for a partition to become - // available in memory to process. - synchronized (partitionAvailable) { - partitionAvailable.notifyAll(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java deleted file mode 100644 index 2cb002f..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.ooc.io.IOCommand; -import org.apache.giraph.ooc.io.LoadPartitionIOCommand; -import org.apache.giraph.ooc.io.StoreDataBufferIOCommand; -import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand; -import org.apache.giraph.ooc.io.StorePartitionIOCommand; -import org.apache.giraph.ooc.io.WaitIOCommand; -import org.apache.log4j.Logger; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.google.common.base.Preconditions.checkState; - -/** - * IO Scheduler for out-of-core mechanism with fixed number of partitions in - * memory - */ -public class FixedOutOfCoreIOScheduler extends OutOfCoreIOScheduler { - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(FixedOutOfCoreIOScheduler.class); - /** Maximum number of partitions to be kept in memory */ - private final int maxPartitionsInMemory; - /** - * Number of partitions to be added (loaded) or removed (stored) to/from - * memory. Each outstanding load partition counts +1 and each outstanding - * store partition counts -1 toward this counter. - */ - private final AtomicInteger deltaNumPartitionsInMemory = - new AtomicInteger(0); - /** Queue of IO commands for loading partitions to memory */ - private final List<Queue<IOCommand>> threadLoadCommandQueue; - /** Queue of IO commands for storing partition on disk */ - private final List<Queue<IOCommand>> threadStoreCommandQueue; - /** Whether IO threads should terminate */ - private volatile boolean shouldTerminate; - - /** - * Constructor - * @param maxPartitionsInMemory maximum number of partitions can be kept in - * memory - * @param numThreads number of available IO threads (i.e. disks) - * @param oocEngine out-of-core engine - * @param conf configuration - */ - public FixedOutOfCoreIOScheduler(int maxPartitionsInMemory, int numThreads, - OutOfCoreEngine oocEngine, - ImmutableClassesGiraphConfiguration conf) { - super(conf, oocEngine, numThreads); - this.maxPartitionsInMemory = maxPartitionsInMemory; - threadLoadCommandQueue = new ArrayList<>(numThreads); - threadStoreCommandQueue = new ArrayList<>(numThreads); - for (int i = 0; i < numThreads; ++i) { - threadLoadCommandQueue.add( - new ConcurrentLinkedQueue<IOCommand>()); - threadStoreCommandQueue.add( - new ConcurrentLinkedQueue<IOCommand>()); - } - shouldTerminate = false; - } - - @Override - public IOCommand getNextIOCommand(int threadId) { - if (shouldTerminate) { - return null; - } - int numPartitionsInMemory = - oocEngine.getMetaPartitionManager().getNumInMemoryPartitions(); - IOCommand command = null; - if (LOG.isInfoEnabled()) { - LOG.info("getNextIOCommand with " + numPartitionsInMemory + - " partitions in memory, " + deltaNumPartitionsInMemory.get() + - " on the fly"); - } - // Check if we have to store a partition on disk - if (numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() > - maxPartitionsInMemory) { - command = threadStoreCommandQueue.get(threadId).poll(); - if (command == null) { - Integer partitionId = oocEngine.getMetaPartitionManager() - .getOffloadPartitionId(threadId); - if (partitionId != null) { - command = new StorePartitionIOCommand(oocEngine, partitionId); - } else { - deltaNumPartitionsInMemory.getAndIncrement(); - } - } else { - checkState(command instanceof StorePartitionIOCommand, - "getNextIOCommand: Illegal command type in store command queue!"); - } - } else { - // Roll back the decrement in delta counter. - deltaNumPartitionsInMemory.getAndIncrement(); - } - - // Check if there is any buffers/messages of current out-of-core partitions - // in memory - if (command == null) { - Integer partitionId = oocEngine.getMetaPartitionManager() - .getOffloadPartitionBufferId(threadId); - if (partitionId != null) { - command = new StoreDataBufferIOCommand(oocEngine, partitionId, - StoreDataBufferIOCommand.DataBufferType.PARTITION); - } else { - partitionId = oocEngine.getMetaPartitionManager() - .getOffloadMessageBufferId(threadId); - if (partitionId != null) { - command = new StoreDataBufferIOCommand(oocEngine, partitionId, - StoreDataBufferIOCommand.DataBufferType.MESSAGE); - } else { - partitionId = oocEngine.getMetaPartitionManager() - .getOffloadMessageId(threadId); - if (partitionId != null) { - command = new StoreIncomingMessageIOCommand(oocEngine, partitionId); - } - } - } - } - - // Check if we can load/prefetch a partition to memory - if (command == null) { - if (numPartitionsInMemory + - deltaNumPartitionsInMemory.getAndIncrement() <= - maxPartitionsInMemory) { - command = threadLoadCommandQueue.get(threadId).poll(); - if (command == null) { - Integer partitionId = oocEngine.getMetaPartitionManager() - .getPrefetchPartitionId(threadId); - if (partitionId != null) { - command = new LoadPartitionIOCommand(oocEngine, partitionId, - oocEngine.getServiceWorker().getSuperstep()); - } else { - deltaNumPartitionsInMemory.getAndDecrement(); - } - } - } else { - // Roll back the increment in delta counter. - deltaNumPartitionsInMemory.getAndDecrement(); - } - } - - // Check if no appropriate IO command is found - if (command == null) { - command = new WaitIOCommand(oocEngine, waitInterval); - } - return command; - } - - @Override - public void ioCommandCompleted(IOCommand command) { - if (command instanceof LoadPartitionIOCommand) { - deltaNumPartitionsInMemory.getAndDecrement(); - } else if (command instanceof StorePartitionIOCommand) { - deltaNumPartitionsInMemory.getAndIncrement(); - } - oocEngine.ioCommandCompleted(command); - } - - @Override - public void addIOCommand(IOCommand ioCommand) { - int ownerThread = getOwnerThreadId(ioCommand.getPartitionId()); - if (ioCommand instanceof LoadPartitionIOCommand) { - threadLoadCommandQueue.get(ownerThread).offer(ioCommand); - } else if (ioCommand instanceof StorePartitionIOCommand) { - threadStoreCommandQueue.get(ownerThread).offer(ioCommand); - } else { - throw new IllegalStateException("addIOCommand: IO command type is not " + - "supported for addition"); - } - } - - @Override - public void shutdown() { - super.shutdown(); - shouldTerminate = true; - } - - /** - * Clear store command queue (should happen at the beginning of each iteration - * to eliminate eager store commands generated by OOC engine) - */ - public void clearStoreCommandQueue() { - for (int i = 0; i < threadStoreCommandQueue.size(); ++i) { - threadStoreCommandQueue.get(i).clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java new file mode 100644 index 0000000..f7badcb --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import com.sun.management.GarbageCollectionNotificationInfo; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.ooc.io.IOCommand; +import org.apache.giraph.ooc.io.LoadPartitionIOCommand; +import org.apache.giraph.ooc.io.StorePartitionIOCommand; +import org.apache.log4j.Logger; + +import java.util.concurrent.atomic.AtomicInteger; + +/** Oracle for fixed out-of-core mechanism */ +public class FixedPartitionsOracle implements OutOfCoreOracle { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(FixedPartitionsOracle.class); + /** Maximum number of partitions to be kept in memory */ + private final int maxPartitionsInMemory; + /** + * Number of partitions to be added (loaded) or removed (stored) to/from + * memory. Each outstanding load partition counts +1 and each outstanding + * store partition counts -1 toward this counter. + */ + private final AtomicInteger deltaNumPartitionsInMemory = + new AtomicInteger(0); + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + + /** + * Constructor + * + * @param conf configuration + * @param oocEngine out-of-core engine + */ + public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf, + OutOfCoreEngine oocEngine) { + this.maxPartitionsInMemory = + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf); + this.oocEngine = oocEngine; + } + + @Override + public IOAction[] getNextIOActions() { + int numPartitionsInMemory = + oocEngine.getMetaPartitionManager().getNumInMemoryPartitions(); + if (LOG.isInfoEnabled()) { + LOG.info("getNextIOActions: calling with " + numPartitionsInMemory + + " partitions in memory, " + deltaNumPartitionsInMemory.get() + + " to be loaded"); + } + int numPartitions = + numPartitionsInMemory + deltaNumPartitionsInMemory.get(); + // Fixed out-of-core policy: + // - if the number of partitions in memory is less than the max number of + // partitions in memory, we should load a partition to memory. This + // basically means we are prefetching partition's data either for the + // current superstep, or for the next superstep. + // - if the number of partitions in memory is equal to the the max number + // of partitions in memory, we do a 'soft store', meaning, we store + // processed partition to disk only if there is an unprocessed partition + // on disk. This basically makes room for unprocessed partitions on disk + // to be prefetched. + // - if the number of partitions in memory is more than the max number of + // partitions in memory, we do a 'hard store', meaning we store a + // partition to disk, regardless of its processing state. + if (numPartitions < maxPartitionsInMemory) { + return new IOAction[]{ + IOAction.LOAD_PARTITION, + IOAction.STORE_MESSAGES_AND_BUFFERS}; + } else if (numPartitions > maxPartitionsInMemory) { + LOG.warn("getNextIOActions: number of partitions in memory passed the " + + "specified threshold!"); + return new IOAction[]{ + IOAction.STORE_PARTITION, + IOAction.STORE_MESSAGES_AND_BUFFERS}; + } else { + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.LOAD_TO_SWAP_PARTITION}; + } + } + + @Override + public boolean approve(IOCommand command) { + int numPartitionsInMemory = oocEngine.getMetaPartitionManager() + .getNumInMemoryPartitions(); + // If loading a partition result in having more partition in memory, the + // command should be denied. Also, if number of partitions in memory is + // already less than the max number of partitions, any command for storing + // a partition should be denied. + if (command instanceof LoadPartitionIOCommand && + numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() > + maxPartitionsInMemory) { + deltaNumPartitionsInMemory.getAndDecrement(); + return false; + + } else if (command instanceof StorePartitionIOCommand && + numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() < + maxPartitionsInMemory) { + deltaNumPartitionsInMemory.getAndIncrement(); + return false; + } + return true; + } + + @Override + public void commandCompleted(IOCommand command) { + if (command instanceof LoadPartitionIOCommand) { + deltaNumPartitionsInMemory.getAndDecrement(); + } else if (command instanceof StorePartitionIOCommand) { + deltaNumPartitionsInMemory.getAndIncrement(); + } + } + + @Override + public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { } + + @Override + public void shutdown() { } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java index bc0ece4..d4c2de5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -18,14 +18,26 @@ package org.apache.giraph.ooc; +import com.sun.management.GarbageCollectionNotificationInfo; +import com.yammer.metrics.util.PercentGauge; +import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.ServerData; +import org.apache.giraph.comm.flow_control.FlowControl; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; +import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.ooc.data.MetaPartitionManager; import org.apache.giraph.ooc.io.IOCommand; +import org.apache.giraph.ooc.io.LoadPartitionIOCommand; +import org.apache.giraph.utils.AdjustableSemaphore; +import org.apache.giraph.worker.BspServiceWorker; import org.apache.log4j.Logger; -import java.util.concurrent.atomic.AtomicBoolean; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -34,24 +46,41 @@ import static com.google.common.base.Preconditions.checkState; /** * Class to represent an out-of-core engine. */ -public abstract class OutOfCoreEngine { +public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { + /** + * Number of 'units of processing' after which an active thread should + * check-in with the out-of-core engine in order to re-claim its permission to + * stay active. For a compute thread, the 'unit of processing' is processing + * of one vertex, and for an input thread, the 'unit of processing' is reading + * a row of input data. + */ + public static final int CHECK_IN_INTERVAL = (1 << 10) - 1; + /** Name of metric for percentage of graph on disk */ + public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%"; /** Class logger. */ private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class); + /** + * When getting partitions, how many milliseconds to wait if no partition was + * available in memory + */ + private static final long MSEC_TO_WAIT = 10000; /** Service worker */ - protected final CentralizedServiceWorker<?, ?, ?> service; + private final CentralizedServiceWorker<?, ?, ?> service; + /** Flow control used in sending requests */ + private FlowControl flowControl; /** Scheduler for IO threads */ - protected OutOfCoreIOScheduler ioScheduler; + private final OutOfCoreIOScheduler ioScheduler; /** Data structure to keep meta partition information */ - protected final MetaPartitionManager metaPartitionManager; - /** How many disk (i.e. IO threads) do we have? */ - protected final int numIOThreads; + private final MetaPartitionManager metaPartitionManager; + /** Out-of-core oracle (brain of out-of-core mechanism) */ + private final OutOfCoreOracle oracle; /** * Whether the job should fail due to IO threads terminating because of * exceptions */ - protected volatile boolean jobFailed = false; - /** Whether the out-of-core engine has initialized */ - private final AtomicBoolean isInitialized = new AtomicBoolean(false); + private volatile boolean jobFailed = false; + /** IO statistics collector */ + private final OutOfCoreIOStatistics statistics; /** * Global lock for entire superstep. This lock helps to avoid overlapping of * out-of-core decisions (what to do next to help the out-of-core mechanism) @@ -60,6 +89,56 @@ public abstract class OutOfCoreEngine { private final ReadWriteLock superstepLock = new ReentrantReadWriteLock(); /** Callable factory for IO threads */ private final OutOfCoreIOCallableFactory oocIOCallableFactory; + /** + * Dummy object to wait on until a partition becomes available in memory + * for processing + */ + private final Object partitionAvailable = new Object(); + /** How many compute threads do we have? */ + private int numComputeThreads; + /** How many threads (input/compute) are processing data? */ + private volatile int numProcessingThreads; + /** Semaphore used for controlling number of active threads at each moment */ + private final AdjustableSemaphore activeThreadsPermit; + /** + * Generally, the logic in Giraph for change of the superstep happens in the + * following order: + * (1) Compute threads are done processing all partitions + * (2) Superstep number increases + * (3) New message store is created and message stores are prepared + * (4) Iteration over partitions starts + * Note that there are other operations happening at the same time as well as + * the above operations, but the above operations are the ones which may + * interfere with out-of-core operations. The goal of `superstepLock` is to + * isolate operations 2, 3, and 4 from the rest of computations and IO + * operations. Specifically, increasing the superstep counter (operation 2) + * should be exclusive and no IO operation should happen at the same time. + * This is due to the fact that prefetching mechanism uses superstep counter + * as a mean to identify which data should be read. That being said, superstep + * counter should be cached in out-of-core engine, and all IO operations and + * out-of-core logic should access superstep counter through this cached + * value. + */ + private long superstep; + /** + * Generally, the logic of a graph computations happens in the following order + * with respect to `startIteration` and `reset` method: + * ... + * startIteration (for moving edges) + * ... + * reset (to prepare messages/partitions for superstep 0) + * ... + * startIteration (superstep 0) + * ... + * reset (to prepare messages/partitions for superstep 1) + * ... + * + * However, in the unit tests, we usually consider only one superstep (usually + * INPUT_SUPERSTEP), and we move through partitions multiple times. Out-of- + * core mechanism works only if partitions are reset in a proper way. So, + * we keep the following flag to reset partitions if necessary. + */ + private boolean resetDone; /** * Constructor @@ -71,21 +150,45 @@ public abstract class OutOfCoreEngine { CentralizedServiceWorker<?, ?, ?> service) { this.service = service; this.oocIOCallableFactory = new OutOfCoreIOCallableFactory(conf, this); - this.numIOThreads = oocIOCallableFactory.getNumDisks(); + /* How many disk (i.e. IO threads) do we have? */ + int numIOThreads = oocIOCallableFactory.getNumDisks(); + this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads); this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this); - oocIOCallableFactory.createCallable(); + this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads); + int maxPartitionsInMemory = + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf); + Class<? extends OutOfCoreOracle> oracleClass = + GiraphConstants.OUT_OF_CORE_ORACLE.get(conf); + if (maxPartitionsInMemory != 0 && + oracleClass != FixedPartitionsOracle.class) { + LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " + + "but the out-of-core oracle used is not tailored for fixed " + + "out-of-core policy. Setting the oracle to be FixedPartitionsOracle"); + oracleClass = FixedPartitionsOracle.class; + } + try { + Constructor<?> constructor = oracleClass.getConstructor( + ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class); + this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this); + } catch (NoSuchMethodException | IllegalAccessException | + InstantiationException | InvocationTargetException e) { + throw new IllegalStateException("OutOfCoreEngine: caught exception " + + "while creating the oracle!", e); + } + this.numComputeThreads = conf.getNumComputeThreads(); + // At the beginning of the execution, only input threads are processing data + this.numProcessingThreads = conf.getNumInputSplitsThreads(); + this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads); + this.superstep = BspService.INPUT_SUPERSTEP; + this.resetDone = false; + GiraphMetrics.get().addSuperstepResetObserver(this); } /** * Initialize/Start the out-of-core engine. */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - "JLM_JSR166_UTILCONCURRENT_MONITORENTER") public void initialize() { - synchronized (isInitialized) { - isInitialized.set(true); - isInitialized.notifyAll(); - } + oocIOCallableFactory.createCallable(); } /** @@ -123,20 +226,7 @@ public abstract class OutOfCoreEngine { * * @return OutOfCoreIOScheduler */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - "JLM_JSR166_UTILCONCURRENT_MONITORENTER") public OutOfCoreIOScheduler getIOScheduler() { - synchronized (isInitialized) { - while (!isInitialized.get()) { - try { - isInitialized.wait(); - } catch (InterruptedException e) { - throw new IllegalStateException("getIOScheduler: " + - "InterruptedException while waiting for out-of-core engine to " + - "be initialized!"); - } - } - } return ioScheduler; } @@ -146,12 +236,11 @@ public abstract class OutOfCoreEngine { * @return MetaPartitionManager */ public MetaPartitionManager getMetaPartitionManager() { - checkState(isInitialized.get()); return metaPartitionManager; } /** - * Get a refernce to superstep lock + * Get a reference to superstep lock * * @return read/write lock used for global superstep lock */ @@ -160,6 +249,24 @@ public abstract class OutOfCoreEngine { } /** + * Get a reference to IO statistics collector + * + * @return IO statistics collector + */ + public OutOfCoreIOStatistics getIOStatistics() { + return statistics; + } + + /** + * Get a reference to out-of-core oracle + * + * @return out-of-core oracle + */ + public OutOfCoreOracle getOracle() { + return oracle; + } + + /** * Get the id of the next partition to process in the current iteration over * all the partitions. If all partitions are already processed, this method * returns null. @@ -167,20 +274,78 @@ public abstract class OutOfCoreEngine { * @return id of a partition to process. 'null' if all partitions are * processed in current iteration over partitions. */ - public abstract Integer getNextPartition(); + public Integer getNextPartition() { + Integer partitionId; + synchronized (partitionAvailable) { + while ((partitionId = metaPartitionManager.getNextPartition()) == null) { + try { + if (LOG.isInfoEnabled()) { + LOG.info("getNextPartition: waiting until a partition becomes " + + "available!"); + } + partitionAvailable.wait(MSEC_TO_WAIT); + } catch (InterruptedException e) { + throw new IllegalStateException("getNextPartition: caught " + + "InterruptedException while waiting to retrieve a partition to " + + "process"); + } + if (jobFailed) { + throw new RuntimeException("Job Failed due to a failure in an " + + "out-of-core IO thread!"); + } + } + if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) { + partitionAvailable.notifyAll(); + partitionId = null; + } + } + return partitionId; + } /** * Notify out-of-core engine that processing of a particular partition is done * * @param partitionId id of the partition that its processing is done */ - public abstract void doneProcessingPartition(int partitionId); + public void doneProcessingPartition(int partitionId) { + metaPartitionManager.setPartitionIsProcessed(partitionId); + if (LOG.isInfoEnabled()) { + LOG.info("doneProcessingPartition: processing partition " + partitionId + + " is done!"); + } + } /** - * Notify out=of-core engine that iteration cycle over all partitions is about + * Notify out-of-core engine that iteration cycle over all partitions is about * to begin. */ - public abstract void startIteration(); + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "UL_UNRELEASED_LOCK_EXCEPTION_PATH") + public void startIteration() { + if (!resetDone) { + superstepLock.writeLock().lock(); + metaPartitionManager.resetPartitions(); + superstepLock.writeLock().unlock(); + } + if (superstep != BspServiceWorker.INPUT_SUPERSTEP && + numProcessingThreads != numComputeThreads) { + // This method is only executed by the main thread, and at this point + // no other input/compute thread is alive. So, all the permits in + // `activeThreadsPermit` is available. However, now that we are changing + // the maximum number of active threads, we need to adjust the number + // of available permits on `activeThreadsPermit`. + activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() * + numComputeThreads / numProcessingThreads); + numProcessingThreads = numComputeThreads; + } + if (LOG.isInfoEnabled()) { + LOG.info("startIteration: with " + + metaPartitionManager.getNumInMemoryPartitions() + + " partitions in memory and " + + activeThreadsPermit.availablePermits() + " active threads"); + } + resetDone = false; + } /** * Retrieve a particular partition. After this method is complete the @@ -188,14 +353,43 @@ public abstract class OutOfCoreEngine { * * @param partitionId id of the partition to retrieve */ - public abstract void retrievePartition(int partitionId); + public void retrievePartition(int partitionId) { + if (metaPartitionManager.isPartitionOnDisk(partitionId)) { + ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId, + superstep)); + synchronized (partitionAvailable) { + while (metaPartitionManager.isPartitionOnDisk(partitionId)) { + try { + if (LOG.isInfoEnabled()) { + LOG.info("retrievePartition: waiting until partition " + + partitionId + " becomes available"); + } + partitionAvailable.wait(); + } catch (InterruptedException e) { + throw new IllegalStateException("retrievePartition: caught " + + "InterruptedException while waiting to retrieve partition " + + partitionId); + } + } + } + } + } /** - * Notify out-of-core engine that an IO command is competed by an IO thread + * Notify out-of-core engine that an IO command is completed by an IO thread * * @param command the IO command that is completed */ - public abstract void ioCommandCompleted(IOCommand command); + public void ioCommandCompleted(IOCommand command) { + oracle.commandCompleted(command); + if (command instanceof LoadPartitionIOCommand) { + // Notifying compute threads who are waiting for a partition to become + // available in memory to process. + synchronized (partitionAvailable) { + partitionAvailable.notifyAll(); + } + } + } /** * Set a flag to fail the job. @@ -203,4 +397,112 @@ public abstract class OutOfCoreEngine { public void failTheJob() { jobFailed = true; } + + /** + * Update the fraction of processing threads that should remain active. It is + * the responsibility of out-of-core oracle to update the number of active + * threads. + * + * @param fraction the fraction of processing threads to remain active. This + * number is in range [0, 1] + */ + public void updateActiveThreadsFraction(double fraction) { + checkState(fraction >= 0 && fraction <= 1); + int numActiveThreads = (int) (numProcessingThreads * fraction); + if (LOG.isInfoEnabled()) { + LOG.info("updateActiveThreadsFraction: updating the number of active " + + "threads to " + numActiveThreads); + } + activeThreadsPermit.setMaxPermits(numActiveThreads); + } + + /** + * A processing thread would check in with out-of-core engine every once in a + * while to make sure that it can still remain active. It is the + * responsibility of the out-of-core oracle to update the number of active + * threads in a way that the computation never fails, and yet achieve the + * optimal performance it can achieve. + */ + public void activeThreadCheckIn() { + activeThreadsPermit.release(); + try { + activeThreadsPermit.acquire(); + } catch (InterruptedException e) { + LOG.error("activeThreadCheckIn: exception while acquiring a permit to " + + "remain an active thread"); + throw new IllegalStateException(e); + } + } + + /** + * Notify the out-of-core engine that a processing (input/compute) thread has + * started. + */ + public void processingThreadStart() { + try { + activeThreadsPermit.acquire(); + } catch (InterruptedException e) { + LOG.error("processingThreadStart: exception while acquiring a permit to" + + " start the processing thread!"); + throw new IllegalStateException(e); + } + } + + /** + * Notify the out-of-core engine that a processing (input/compute) thread has + * finished. + */ + public void processingThreadFinish() { + activeThreadsPermit.release(); + } + + /** + * Reset partitions and messages meta data. Also, reset the cached value of + * superstep counter. + */ + public void reset() { + metaPartitionManager.resetPartitions(); + metaPartitionManager.resetMessages(); + superstep = service.getSuperstep(); + resetDone = true; + } + + /** + * @return cached value of the superstep counter + */ + public long getSuperstep() { + return superstep; + } + + /** + * Notify the out-of-core engine that a GC has just been completed + * + * @param info GC information + */ + public void gcCompleted(GarbageCollectionNotificationInfo info) { + oracle.gcCompleted(info); + } + + @Override + public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) { + superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new PercentGauge() { + @Override + protected double getNumerator() { + return metaPartitionManager.getNumInMemoryPartitions(); + } + + @Override + protected double getDenominator() { + return metaPartitionManager.getNumPartitions(); + } + }); + } + + public FlowControl getFlowControl() { + return flowControl; + } + + public void setFlowControl(FlowControl flowControl) { + this.flowControl = flowControl; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java index 6c6e4ff..962bd6a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java @@ -18,7 +18,13 @@ package org.apache.giraph.ooc; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Histogram; +import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; +import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.ooc.io.IOCommand; +import org.apache.giraph.ooc.io.LoadPartitionIOCommand; import org.apache.giraph.ooc.io.WaitIOCommand; import org.apache.log4j.Logger; @@ -27,7 +33,16 @@ import java.util.concurrent.Callable; /** * IO threads for out-of-core mechanism. */ -public class OutOfCoreIOCallable implements Callable<Void> { +public class OutOfCoreIOCallable implements Callable<Void>, + ResetSuperstepMetricsObserver { + /** Name of Metric for number of bytes read from disk */ + public static final String BYTES_LOAD_FROM_DISK = "ooc-bytes-load"; + /** Name of Metric for number of bytes written to disk */ + public static final String BYTES_STORE_TO_DISK = "ooc-bytes-store"; + /** Name of Metric for size of loads */ + public static final String HISTOGRAM_LOAD_SIZE = "ooc-load-size-bytes"; + /** Name of Metric for size of stores */ + public static final String HISTOGRAM_STORE_SIZE = "ooc-store-size-bytes"; /** Class logger. */ private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class); /** Out-of-core engine */ @@ -36,6 +51,14 @@ public class OutOfCoreIOCallable implements Callable<Void> { private final String basePath; /** Thread id/Disk id */ private final int diskId; + /** How many bytes of data is read from disk */ + private Counter bytesReadPerSuperstep; + /** How many bytes of data is written to disk */ + private Counter bytesWrittenPerSuperstep; + /** Size of load IO commands */ + private Histogram histogramLoadSize; + /** Size of store IO commands */ + private Histogram histogramStoreSize; /** * Constructor @@ -49,10 +72,12 @@ public class OutOfCoreIOCallable implements Callable<Void> { this.oocEngine = oocEngine; this.basePath = basePath; this.diskId = diskId; + newSuperstep(GiraphMetrics.get().perSuperstep()); + GiraphMetrics.get().addSuperstepResetObserver(this); } @Override - public Void call() { + public Void call() throws Exception { while (true) { oocEngine.getSuperstepLock().readLock().lock(); IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId); @@ -67,17 +92,44 @@ public class OutOfCoreIOCallable implements Callable<Void> { if (command instanceof WaitIOCommand) { oocEngine.getSuperstepLock().readLock().unlock(); } + + boolean commandExecuted = false; + long duration = 0; + long bytes; // CHECKSTYLE: stop IllegalCatch try { - command.execute(basePath); + long startTime = System.currentTimeMillis(); + commandExecuted = command.execute(basePath); + duration = System.currentTimeMillis() - startTime; + bytes = command.bytesTransferred(); + if (LOG.isInfoEnabled()) { + LOG.info("call: thread " + diskId + "'s command " + command + + " completed: bytes= " + bytes + ", duration=" + duration + ", " + + "bandwidth=" + String.format("%.2f", (double) bytes / duration * + 1000 / 1024 / 1024)); + } } catch (Exception e) { oocEngine.failTheJob(); - LOG.info("call: execution of IO command " + command + " failed!"); + LOG.error("call: execution of IO command " + command + " failed!"); throw new RuntimeException(e); } // CHECKSTYLE: resume IllegalCatch if (!(command instanceof WaitIOCommand)) { oocEngine.getSuperstepLock().readLock().unlock(); + if (bytes != 0) { + if (command instanceof LoadPartitionIOCommand) { + bytesReadPerSuperstep.inc(bytes); + histogramLoadSize.update(bytes); + } else { + bytesWrittenPerSuperstep.inc(bytes); + histogramStoreSize.update(bytes); + } + } + } + + if (commandExecuted && duration > 0) { + oocEngine.getIOStatistics().update(command.getType(), + command.bytesTransferred(), duration); } oocEngine.getIOScheduler().ioCommandCompleted(command); } @@ -86,5 +138,16 @@ public class OutOfCoreIOCallable implements Callable<Void> { } return null; } + + @Override + public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) { + bytesReadPerSuperstep = superstepMetrics.getCounter(BYTES_LOAD_FROM_DISK); + bytesWrittenPerSuperstep = + superstepMetrics.getCounter(BYTES_STORE_TO_DISK); + histogramLoadSize = + superstepMetrics.getUniformHistogram(HISTOGRAM_LOAD_SIZE); + histogramStoreSize = + superstepMetrics.getUniformHistogram(HISTOGRAM_STORE_SIZE); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java index dee632d..6428c30 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java @@ -22,12 +22,25 @@ import com.google.common.hash.Hashing; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.IntConfOption; import org.apache.giraph.ooc.io.IOCommand; +import org.apache.giraph.ooc.io.LoadPartitionIOCommand; +import org.apache.giraph.ooc.io.StoreDataBufferIOCommand; +import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand; +import org.apache.giraph.ooc.io.StorePartitionIOCommand; +import org.apache.giraph.ooc.io.WaitIOCommand; import org.apache.log4j.Logger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static com.google.common.base.Preconditions.checkNotNull; + /** * Representation of IO thread scheduler for out-of-core mechanism */ -public abstract class OutOfCoreIOScheduler { +public class OutOfCoreIOScheduler { /** * If an IO thread does not have any command to do, it waits for certain a * period and check back again to see if there exist any command to perform. @@ -41,11 +54,18 @@ public abstract class OutOfCoreIOScheduler { private static final Logger LOG = Logger.getLogger(OutOfCoreIOScheduler.class); /** Out-of-core engine */ - protected final OutOfCoreEngine oocEngine; + private final OutOfCoreEngine oocEngine; /** How much an IO thread should wait if there is no IO command */ - protected final int waitInterval; + private final int waitInterval; /** How many disks (i.e. IO threads) do we have? */ private final int numDisks; + /** + * Queue of IO commands for loading partitions to memory. Load commands are + * urgent and should be done once loading data is a viable IO command. + */ + private final List<Queue<IOCommand>> threadLoadCommandQueue; + /** Whether IO threads should terminate */ + private volatile boolean shouldTerminate; /** * Constructor @@ -59,6 +79,12 @@ public abstract class OutOfCoreIOScheduler { this.oocEngine = oocEngine; this.numDisks = numDisks; this.waitInterval = OOC_WAIT_INTERVAL.get(conf); + threadLoadCommandQueue = new ArrayList<>(numDisks); + for (int i = 0; i < numDisks; ++i) { + threadLoadCommandQueue.add( + new ConcurrentLinkedQueue<IOCommand>()); + } + shouldTerminate = false; } /** @@ -78,26 +104,170 @@ public abstract class OutOfCoreIOScheduler { * @param threadId id of the thread ready to execute the next IO command * @return next IO command to be executed by the given thread */ - public abstract IOCommand getNextIOCommand(int threadId); + public IOCommand getNextIOCommand(int threadId) { + if (shouldTerminate) { + return null; + } + IOCommand command = null; + do { + if (command != null && LOG.isInfoEnabled()) { + LOG.info("getNextIOCommand: command " + command + " was proposed to " + + "the oracle, but got denied. Generating another command!"); + } + OutOfCoreOracle.IOAction[] actions = + oocEngine.getOracle().getNextIOActions(); + if (LOG.isInfoEnabled()) { + LOG.info("getNextIOCommand: actions are " + Arrays.toString(actions)); + } + // Check whether there are any urgent outstanding load requests + if (!threadLoadCommandQueue.get(threadId).isEmpty()) { + // Check whether loading a partition is a viable (allowed) action to do + boolean canLoad = false; + for (OutOfCoreOracle.IOAction action : actions) { + if (action == OutOfCoreOracle.IOAction.LOAD_PARTITION || + action == OutOfCoreOracle.IOAction.LOAD_UNPROCESSED_PARTITION || + action == OutOfCoreOracle.IOAction.LOAD_TO_SWAP_PARTITION || + action == OutOfCoreOracle.IOAction.URGENT_LOAD_PARTITION) { + canLoad = true; + break; + } + } + if (canLoad) { + command = threadLoadCommandQueue.get(threadId).poll(); + checkNotNull(command); + if (oocEngine.getOracle().approve(command)) { + return command; + } else { + // Loading is not viable at this moment. We should put the command + // back in the load queue and wait until loading becomes viable. + threadLoadCommandQueue.get(threadId).offer(command); + } + } + } + command = null; + for (OutOfCoreOracle.IOAction action : actions) { + Integer partitionId; + switch (action) { + case STORE_MESSAGES_AND_BUFFERS: + partitionId = oocEngine.getMetaPartitionManager() + .getOffloadPartitionBufferId(threadId); + if (partitionId != null) { + command = new StoreDataBufferIOCommand(oocEngine, partitionId, + StoreDataBufferIOCommand.DataBufferType.PARTITION); + } else { + partitionId = oocEngine.getMetaPartitionManager() + .getOffloadMessageBufferId(threadId); + if (partitionId != null) { + command = new StoreDataBufferIOCommand(oocEngine, partitionId, + StoreDataBufferIOCommand.DataBufferType.MESSAGE); + } else { + partitionId = oocEngine.getMetaPartitionManager() + .getOffloadMessageId(threadId); + if (partitionId != null) { + command = new StoreIncomingMessageIOCommand(oocEngine, + partitionId); + } + } + } + break; + case STORE_PROCESSED_PARTITION: + partitionId = oocEngine.getMetaPartitionManager() + .getOffloadPartitionId(threadId); + if (partitionId != null && + oocEngine.getMetaPartitionManager() + .isPartitionProcessed(partitionId)) { + command = new StorePartitionIOCommand(oocEngine, partitionId); + } + break; + case STORE_PARTITION: + partitionId = oocEngine.getMetaPartitionManager() + .getOffloadPartitionId(threadId); + if (partitionId != null) { + command = new StorePartitionIOCommand(oocEngine, partitionId); + } + break; + case LOAD_UNPROCESSED_PARTITION: + partitionId = oocEngine.getMetaPartitionManager() + .getLoadPartitionId(threadId); + if (partitionId != null && + !oocEngine.getMetaPartitionManager() + .isPartitionProcessed(partitionId)) { + command = new LoadPartitionIOCommand(oocEngine, partitionId, + oocEngine.getSuperstep()); + } + break; + case LOAD_TO_SWAP_PARTITION: + partitionId = oocEngine.getMetaPartitionManager() + .getLoadPartitionId(threadId); + if (partitionId != null && + !oocEngine.getMetaPartitionManager() + .isPartitionProcessed(partitionId) && + oocEngine.getMetaPartitionManager().hasProcessedOnMemory()) { + command = new LoadPartitionIOCommand(oocEngine, partitionId, + oocEngine.getSuperstep()); + } + break; + case LOAD_PARTITION: + partitionId = oocEngine.getMetaPartitionManager() + .getLoadPartitionId(threadId); + if (partitionId != null) { + if (oocEngine.getMetaPartitionManager() + .isPartitionProcessed(partitionId)) { + command = new LoadPartitionIOCommand(oocEngine, partitionId, + oocEngine.getSuperstep() + 1); + } else { + command = new LoadPartitionIOCommand(oocEngine, partitionId, + oocEngine.getSuperstep()); + } + } + break; + case URGENT_LOAD_PARTITION: + // Do nothing + break; + default: + throw new IllegalStateException("getNextIOCommand: the IO action " + + "is not defined!"); + } + if (command != null) { + break; + } + } + if (command == null) { + command = new WaitIOCommand(oocEngine, waitInterval); + } + } while (!oocEngine.getOracle().approve(command)); + return command; + } /** * Notify IO scheduler that the IO command is completed * * @param command completed command */ - public abstract void ioCommandCompleted(IOCommand command); + public void ioCommandCompleted(IOCommand command) { + oocEngine.ioCommandCompleted(command); + } /** * Add an IO command to the scheduling queue of the IO scheduler * * @param ioCommand IO command to add to the scheduler */ - public abstract void addIOCommand(IOCommand ioCommand); + public void addIOCommand(IOCommand ioCommand) { + int ownerThread = getOwnerThreadId(ioCommand.getPartitionId()); + if (ioCommand instanceof LoadPartitionIOCommand) { + threadLoadCommandQueue.get(ownerThread).offer(ioCommand); + } else { + throw new IllegalStateException("addIOCommand: IO command type is not " + + "supported for addition"); + } + } /** * Shutdown/Terminate the IO scheduler, and notify all IO threads to halt */ public void shutdown() { + shouldTerminate = true; if (LOG.isInfoEnabled()) { LOG.info("shutdown: OutOfCoreIOScheduler shutting down!"); } http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java new file mode 100644 index 0000000..a225a4c --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import com.google.common.collect.Maps; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.ooc.io.IOCommand.IOCommandType; +import org.apache.log4j.Logger; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Class to collect statistics regarding IO operations done in out-of-core + * mechanism. + */ +public class OutOfCoreIOStatistics { + /** + * An estimate of disk bandwidth. This number is only used just at the start + * of the computation, and will be updated/replaced later on once a few disk + * operations happen. + */ + public static final IntConfOption DISK_BANDWIDTH_ESTIMATE = + new IntConfOption("giraph.diskBandwidthEstimate", 125, + "An estimate of disk bandwidth (MB/s). This number is used just at " + + "the beginning of the computation, and it will be " + + "updated/replaced once a few disk operations happen."); + /** + * How many recent IO operations should we keep track of? Any report given out + * of this statistics collector is only based on most recent IO operations. + */ + public static final IntConfOption IO_COMMAND_HISTORY_SIZE = + new IntConfOption("giraph.ioCommandHistorySize", 50, + "Number of most recent IO operations to consider for reporting the" + + "statistics."); + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(OutOfCoreIOStatistics.class); + /** Estimate of disk bandwidth (bytes/second) */ + private final AtomicLong diskBandwidthEstimate; + /** Cached value for IO_COMMAND_HISTORY_SIZE */ + private final int maxHistorySize; + /** + * Coefficient/Weight of the most recent IO operation toward the disk + * bandwidth estimate. Basically if the disk bandwidth estimate if d, and the + * latest IO command happened at the rate of r, the new estimate of disk + * bandwidth is calculated as: + * d_new = updateCoefficient * r + (1 - updateCoefficient) * d + */ + private final double updateCoefficient; + /** Queue of all recent commands */ + private final Queue<StatisticsEntry> commandHistory; + /** + * Command statistics for each type of IO command. This is the statistics of + * the recent commands in the history we keep track of (with 'maxHistorySize' + * command in the history). + */ + private final Map<IOCommandType, StatisticsEntry> aggregateStats; + /** How many IO command completed? */ + private int numUpdates = 0; + + /** + * Constructor + * + * @param conf configuration + * @param numIOThreads number of disks/IO threads + */ + public OutOfCoreIOStatistics(ImmutableClassesGiraphConfiguration conf, + int numIOThreads) { + this.diskBandwidthEstimate = + new AtomicLong(DISK_BANDWIDTH_ESTIMATE.get(conf) * + (long) GiraphConstants.ONE_MB); + this.maxHistorySize = IO_COMMAND_HISTORY_SIZE.get(conf); + this.updateCoefficient = 1.0 / maxHistorySize; + // Adding more entry to the capacity of the queue to have a wiggle room + // if all IO threads are adding/removing entries from the queue + this.commandHistory = + new ArrayBlockingQueue<>(maxHistorySize + numIOThreads); + this.aggregateStats = Maps.newConcurrentMap(); + for (IOCommandType type : IOCommandType.values()) { + aggregateStats.put(type, new StatisticsEntry(type, 0, 0, 0)); + } + } + + /** + * Update statistics with the last IO command that is executed. + * + * @param type type of the IO command that is executed + * @param bytesTransferred number of bytes transferred in the last IO command + * @param duration duration it took for the last IO command to complete + * (milliseconds) + */ + public void update(IOCommandType type, long bytesTransferred, + long duration) { + StatisticsEntry entry = aggregateStats.get(type); + synchronized (entry) { + entry.setOccurrence(entry.getOccurrence() + 1); + entry.setDuration(duration + entry.getDuration()); + entry.setBytesTransferred(bytesTransferred + entry.getBytesTransferred()); + } + commandHistory.offer( + new StatisticsEntry(type, bytesTransferred, duration, 0)); + if (type != IOCommandType.WAIT) { + // If the current estimate is 'd', the new rate is 'r', and the size of + // history is 'n', we can simply model all the past command's rate as: + // d, d, d, ..., d, r + // where 'd' happens for 'n-1' times. Hence the new estimate of the + // bandwidth would be: + // d_new = (d * (n-1) + r) / n = (1-1/n)*d + 1/n*r + // where updateCoefficient = 1/n + diskBandwidthEstimate.set((long) + (updateCoefficient * (bytesTransferred / duration * 1000) + + (1 - updateCoefficient) * diskBandwidthEstimate.get())); + } + if (commandHistory.size() > maxHistorySize) { + StatisticsEntry removedEntry = commandHistory.poll(); + entry = aggregateStats.get(removedEntry.getType()); + synchronized (entry) { + entry.setOccurrence(entry.getOccurrence() - 1); + entry.setDuration(entry.getDuration() - removedEntry.getDuration()); + entry.setBytesTransferred( + entry.getBytesTransferred() - removedEntry.getBytesTransferred()); + } + } + numUpdates++; + // Outputting log every so many commands + if (numUpdates % 10 == 0) { + if (LOG.isInfoEnabled()) { + LOG.info(this); + } + } + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + long waitTime = 0; + long loadTime = 0; + long bytesRead = 0; + long storeTime = 0; + long bytesWritten = 0; + for (Map.Entry<IOCommandType, StatisticsEntry> entry : + aggregateStats.entrySet()) { + synchronized (entry.getValue()) { + sb.append(entry.getKey() + ": " + entry.getValue() + ", "); + if (entry.getKey() == IOCommandType.WAIT) { + waitTime += entry.getValue().getDuration(); + } else if (entry.getKey() == IOCommandType.LOAD_PARTITION) { + loadTime += entry.getValue().getDuration(); + bytesRead += entry.getValue().getBytesTransferred(); + } else { + storeTime += entry.getValue().getDuration(); + bytesWritten += entry.getValue().getBytesTransferred(); + } + } + } + sb.append(String.format("Average STORE: %.2f MB/s, ", + (double) bytesWritten / storeTime * 1000 / 1024 / 1024)); + sb.append(String.format("DATA_INJECTION: %.2f MB/s, ", + (double) (bytesRead - bytesWritten) / + (waitTime + loadTime + storeTime) * 1000 / 1024 / 1024)); + sb.append(String.format("DISK_BANDWIDTH: %.2f MB/s", + (double) diskBandwidthEstimate.get() / 1024 / 1024)); + + return sb.toString(); + } + + /** + * @return most recent estimate of the disk bandwidth + */ + public long getDiskBandwidth() { + return diskBandwidthEstimate.get(); + } + + /** + * Get aggregate statistics of a given command type in the command history + * + * @param type type of the command to get the statistics for + * @return aggregate statistics for the given command type + */ + public BytesDuration getCommandTypeStats(IOCommandType type) { + StatisticsEntry entry = aggregateStats.get(type); + synchronized (entry) { + return new BytesDuration(entry.getBytesTransferred(), entry.getDuration(), + entry.getOccurrence()); + } + } + + /** + * Helper class to return results of statistics collector for a certain + * command type + */ + public static class BytesDuration { + /** Number of bytes transferred in a few commands of the same type */ + private long bytes; + /** Duration of it took to execute a few commands of the same type */ + private long duration; + /** Number of commands executed of the same type */ + private int occurrence; + + /** + * Constructor + * @param bytes number of bytes transferred + * @param duration duration it took to execute commands + * @param occurrence number of commands + */ + BytesDuration(long bytes, long duration, int occurrence) { + this.bytes = bytes; + this.duration = duration; + this.occurrence = occurrence; + } + + /** + * @return number of bytes transferred for the same command type + */ + public long getBytes() { + return bytes; + } + + /** + * @return duration it took to execute a few commands of the same type + */ + public long getDuration() { + return duration; + } + + /** + * @return number of commands that are executed of the same type + */ + public int getOccurrence() { + return occurrence; + } + } + + /** + * Helper class to keep statistics for a certain command type + */ + private static class StatisticsEntry { + /** Type of the command */ + private IOCommandType type; + /** + * Aggregate number of bytes transferred executing the particular command + * type in the history we keep + */ + private long bytesTransferred; + /** + * Aggregate duration it took executing the particular command type in the + * history we keep + */ + private long duration; + /** + * Number of occurrences of the particular command type in the history we + * keep + */ + private int occurrence; + + /** + * Constructor + * + * @param type type of the command + * @param bytesTransferred aggregate number of bytes transferred + * @param duration aggregate execution time + * @param occurrence number of occurrences of the particular command type + */ + public StatisticsEntry(IOCommandType type, long bytesTransferred, + long duration, int occurrence) { + this.type = type; + this.bytesTransferred = bytesTransferred; + this.duration = duration; + this.occurrence = occurrence; + } + + /** + * @return type of the command + */ + public IOCommandType getType() { + return type; + } + + /** + * @return aggregate number of bytes transferred in the particular command + * type + */ + public long getBytesTransferred() { + return bytesTransferred; + } + + /** + * Update the aggregate number of bytes transferred + * + * @param bytesTransferred aggregate number of bytes + */ + public void setBytesTransferred(long bytesTransferred) { + this.bytesTransferred = bytesTransferred; + } + + /** + * @return aggregate duration it took to execute the particular command type + */ + public long getDuration() { + return duration; + } + + /** + * Update the aggregate duration + * + * @param duration aggregate duration + */ + public void setDuration(long duration) { + this.duration = duration; + } + + /** + * @return number of occurrences of the particular command type + */ + public int getOccurrence() { + return occurrence; + } + + /** + * Update the number of occurrences of the particular command type + * + * @param occurrence number of occurrences + */ + public void setOccurrence(int occurrence) { + this.occurrence = occurrence; + } + + @Override + public String toString() { + if (type == IOCommandType.WAIT) { + return String.format("%.2f sec", duration / 1000.0); + } else { + return String.format("%.2f MB/s", + (double) bytesTransferred / duration * 1000 / 1024 / 2014); + } + } + } +}
