Updated Branches: refs/heads/trunk 13c3aa116 -> 218f60236
GIRAPH-722: ProgressableUtils.waitForever is not calling progress (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/218f6023 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/218f6023 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/218f6023 Branch: refs/heads/trunk Commit: 218f60236ff174852ce91384256d722d2ad48cf8 Parents: 13c3aa1 Author: Maja Kabiljo <[email protected]> Authored: Wed Jul 24 15:18:39 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Wed Jul 24 15:18:39 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/utils/ProgressableUtils.java | 39 +++++++++-- .../giraph/utils/TestProgressableUtils.java | 70 ++++++++++++++++++++ 3 files changed, 106 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/218f6023/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 6df7ad7..bc3e84d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-722: ProgressableUtils.waitForever is not calling progress (majakabiljo) + GIRAPH-549: Tinkerpop/Blueprints/Rexter InputFormat (armax00 via claudio) GIRAPH-701: Communication improvement using one-to-all message http://git-wip-us.apache.org/repos/asf/giraph/blob/218f6023/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java index 3b06604..78c230a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java @@ -41,7 +41,7 @@ public class ProgressableUtils { private static final Logger LOG = Logger.getLogger(ProgressableUtils.class); /** Msecs to refresh the progress meter (one minute) */ - private static final int MSEC_PERIOD = 60 * 1000; + private static final int DEFUALT_MSEC_PERIOD = 60 * 1000; /** Do not instantiate. */ private ProgressableUtils() { @@ -53,6 +53,20 @@ public class ProgressableUtils { * * @param executor Executor which we are waiting for * @param progressable Progressable for reporting progress (Job context) + * @param msecsPeriod How often to report progress + */ + public static void awaitExecutorTermination(ExecutorService executor, + Progressable progressable, int msecsPeriod) { + waitForever(new ExecutorServiceWaitable(executor), progressable, + msecsPeriod); + } + + /** + * Wait for executor tasks to terminate, while periodically reporting + * progress. + * + * @param executor Executor which we are waiting for + * @param progressable Progressable for reporting progress (Job context) */ public static void awaitExecutorTermination(ExecutorService executor, Progressable progressable) { @@ -107,8 +121,22 @@ public class ProgressableUtils { */ private static <T> T waitForever(Waitable<T> waitable, Progressable progressable) { + return waitForever(waitable, progressable, DEFUALT_MSEC_PERIOD); + } + + /** + * Wait forever for waitable to finish. Periodically reports progress. + * + * @param waitable Waitable which we wait for + * @param progressable Progressable for reporting progress (Job context) + * @param msecsPeriod How often to report progress + * @param <T> Result type + * @return Result of waitable + */ + private static <T> T waitForever(Waitable<T> waitable, + Progressable progressable, int msecsPeriod) { while (true) { - waitFor(waitable, progressable, MSEC_PERIOD); + waitFor(waitable, progressable, msecsPeriod, msecsPeriod); if (waitable.isFinished()) { try { return waitable.getResult(); @@ -130,15 +158,17 @@ public class ProgressableUtils { * @param waitable Waitable which we wait for * @param progressable Progressable for reporting progress (Job context) * @param msecs Number of milliseconds to wait for + * @param msecsPeriod How often to report progress * @param <T> Result type * @return Result of waitable */ private static <T> T waitFor(Waitable<T> waitable, Progressable progressable, - int msecs) { + int msecs, int msecsPeriod) { long timeoutTimeMsecs = System.currentTimeMillis() + msecs; int currentWaitMsecs; while (true) { - currentWaitMsecs = Math.min(msecs, MSEC_PERIOD); + progressable.progress(); + currentWaitMsecs = Math.min(msecs, msecsPeriod); try { waitable.waitFor(currentWaitMsecs); if (waitable.isFinished()) { @@ -157,7 +187,6 @@ public class ProgressableUtils { if (System.currentTimeMillis() >= timeoutTimeMsecs) { return waitable.getTimeoutResult(); } - progressable.progress(); msecs = Math.max(0, msecs - currentWaitMsecs); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/218f6023/giraph-core/src/test/java/org/apache/giraph/utils/TestProgressableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestProgressableUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/TestProgressableUtils.java new file mode 100644 index 0000000..c7177a3 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestProgressableUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.hadoop.util.Progressable; +import org.junit.Test; + +import junit.framework.Assert; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; + +/** + * Test ProgressableUtils + */ +public class TestProgressableUtils { + @Test + public void testProgressableUtils() throws NoSuchFieldException, + IllegalAccessException { + final int sleepTime = 1800; + final int msecPeriod = 500; + ExecutorService executor = Executors.newFixedThreadPool(1); + executor.submit(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw new IllegalStateException(); + } + } + }); + executor.shutdown(); + CountProgressable countProgressable = new CountProgressable(); + ProgressableUtils.awaitExecutorTermination(executor, countProgressable, + msecPeriod); + Assert.assertTrue(countProgressable.counter >= sleepTime / msecPeriod); + Assert.assertTrue( + countProgressable.counter <= (sleepTime + msecPeriod) / msecPeriod); + } + + private static class CountProgressable implements Progressable { + private int counter = 0; + + @Override + public void progress() { + counter++; + } + } +}
