Repository: incubator-reef Updated Branches: refs/heads/master 84feeaf6a -> c9e1bf9b3
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java index b70e9df..55b5fb0 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/BlockingJoin.java @@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentSkipListSet; public class BlockingJoin implements StaticObservable { private final Observer<TupleEvent> out; private final ConcurrentSkipListSet<TupleEvent> left = new ConcurrentSkipListSet<>(); - boolean leftDone = false; + private boolean leftDone = false; public BlockingJoin(final Observer<TupleEvent> out) { this.out = out; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java index 22ed659..43480dd 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/join/TupleSource.java @@ -23,8 +23,8 @@ import org.apache.reef.wake.rx.Observer; import org.apache.reef.wake.rx.StaticObservable; public class TupleSource implements StaticObservable, Stage { - final Thread[] threads; - final Observer<TupleEvent> out; + private final Thread[] threads; + private final Observer<TupleEvent> out; public TupleSource(final Observer<TupleEvent> out, final int max, final int numThreads, final boolean evenOnly) { this.out = out; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java index 07771a0..8c3ec92 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/MergingEventHandler.java @@ -39,13 +39,21 @@ import java.util.logging.Logger; public final class MergingEventHandler<L, R> { private static final Logger LOG = Logger.getLogger(MergingEventHandler.class.getName()); - public final EventHandler<L> left = new Left(); - public final EventHandler<R> right = new Right(); + private final EventHandler<L> left = new Left(); + private final EventHandler<R> right = new Right(); private final Object mutex = new Object(); private final EventHandler<Pair<L, R>> destination; private L leftEvent; private R rightEvent; + public EventHandler<L> getLeft() { + return left; + } + + public EventHandler<R> getRight() { + return right; + } + @Inject public MergingEventHandler(final EventHandler<Pair<L, R>> destination) { this.destination = destination; @@ -61,8 +69,16 @@ public final class MergingEventHandler<L, R> { } public static final class Pair<S1, S2> { - public final S1 first; - public final S2 second; + private final S1 first; + private final S2 second; + + public S1 getFirst() { + return first; + } + + public S2 getSecond() { + return second; + } private Pair(final S1 s1, final S2 s2) { this.first = s1; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java index 72714d6..10baa2e 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java @@ -138,8 +138,8 @@ public class WakeProfiler implements Aspect { final Object o = methodProxy.invokeSuper(object, args); final long stop = System.nanoTime(); - s.messageCount.incrementAndGet(); - s.sumLatency.addAndGet(stop - start); + s.getMessageCount().incrementAndGet(); + s.getSumLatency().addAndGet(stop - start); return o; @@ -242,8 +242,8 @@ public class WakeProfiler implements Aspect { } else { final Stats stat = stats.get(o); if (stat != null) { - final long cnt = stat.messageCount.get(); - final long lat = stat.sumLatency.get(); + final long cnt = stat.getMessageCount().get(); + final long lat = stat.getSumLatency().get(); tooltip = ",\"count\":" + cnt + ",\"latency\":\"" + (((double) lat) / (((double) cnt) * 1000000.0) + "\""); // quote the latency, since it might be nan } else { @@ -283,7 +283,7 @@ public class WakeProfiler implements Aspect { final Stats s = stats.get(w.getObject()); if (s != null) { links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" + - (s.messageCount.get() + 3.0) + "}"); + (s.getMessageCount().get() + 3.0) + "}"); } else { links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" + 1.0 + "}"); } @@ -302,8 +302,16 @@ public class WakeProfiler implements Aspect { } private final class Stats { - AtomicLong messageCount = new AtomicLong(0); - AtomicLong sumLatency = new AtomicLong(0); + private AtomicLong messageCount = new AtomicLong(0); + private AtomicLong sumLatency = new AtomicLong(0); + + AtomicLong getMessageCount() { + return messageCount; + } + + AtomicLong getSumLatency() { + return sumLatency; + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java index 2221f74..0e3140c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java @@ -22,10 +22,10 @@ import org.apache.reef.wake.Identifiable; import org.apache.reef.wake.Identifier; public class ReadRequest implements Identifiable { - final StorageIdentifier f; - final long offset; - final byte[] buf; - final Identifier id; + private final StorageIdentifier f; + private final long offset; + private final byte[] buf; + private final Identifier id; public ReadRequest(final StorageIdentifier f, final long offset, final byte[] buf, final Identifier id) { this.f = f; @@ -38,4 +38,13 @@ public class ReadRequest implements Identifiable { public Identifier getId() { return id; } + public StorageIdentifier getF() { + return f; + } + public long getOffset() { + return offset; + } + public byte[] getBuf() { + return buf; + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadResponse.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadResponse.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadResponse.java index b08fa98..b8322db 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadResponse.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadResponse.java @@ -21,13 +21,23 @@ package org.apache.reef.wake.storage; import org.apache.reef.wake.Identifier; public class ReadResponse { - final byte[] buf; - final int bytesRead; - final Identifier reqId; + private final byte[] buf; + private final int bytesRead; + private final Identifier reqId; public ReadResponse(final byte[] buf, final int bytesRead, final Identifier reqId) { this.buf = buf; this.bytesRead = bytesRead; this.reqId = reqId; } + +/* public byte[] getBuf() { + return buf; + } + public int getBytesRead() { + return bytesRead; + } + public Identifier getReqId() { + return reqId; + }*/ } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java index a864f53..3a5ae29 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java @@ -25,19 +25,19 @@ import java.io.FileInputStream; import java.io.IOException; public class SequentialFileReader implements EStage<ReadRequest> { - final EventHandler<ReadResponse> dest = null; - final FileHandlePool fdPool = new FileHandlePool(); + private final EventHandler<ReadResponse> dest = null; + private final FileHandlePool fdPool = new FileHandlePool(); @Override public void onNext(final ReadRequest value) { - final FileInputStream fin = fdPool.get(value.f); + final FileInputStream fin = fdPool.get(value.getF()); int readSoFar = 0; try { synchronized (fin) { fin.reset(); - fin.skip(value.offset); - while (readSoFar != value.buf.length) { - final int ret = fin.read(value.buf, readSoFar, value.buf.length); + fin.skip(value.getOffset()); + while (readSoFar != value.getBuf().length) { + final int ret = fin.read(value.getBuf(), readSoFar, value.getBuf().length); if (ret == -1) { break; } @@ -45,11 +45,11 @@ public class SequentialFileReader implements EStage<ReadRequest> { } } } catch (final IOException e) { - fdPool.release(value.f, fin); + fdPool.release(value.getF(), fin); // err.onNext(null); //new ReadError(e)); } - fdPool.release(value.f, fin); - dest.onNext(new ReadResponse(value.buf, readSoFar, value.id)); + fdPool.release(value.getF(), fin); + dest.onNext(new ReadResponse(value.getBuf(), readSoFar, value.getId())); } @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MergingEventHandlerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MergingEventHandlerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MergingEventHandlerTest.java index cd439c0..dac723e 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MergingEventHandlerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MergingEventHandlerTest.java @@ -44,12 +44,12 @@ public class MergingEventHandlerTest { new MergingEventHandler<>(new EventHandler<Pair<Integer, Integer>>() { @Override public void onNext(final Pair<Integer, Integer> value) { - i.addAndGet(value.first + 31 * value.second); + i.addAndGet(value.getFirst() + 31 * value.getSecond()); } }); - dut.left.onNext(testLeft); - dut.right.onNext(testRight); + dut.getLeft().onNext(testLeft); + dut.getRight().onNext(testRight); Assert.assertEquals(expected, i.get()); } @@ -67,12 +67,12 @@ public class MergingEventHandlerTest { new MergingEventHandler<>(new EventHandler<Pair<Integer, Integer>>() { @Override public void onNext(final Pair<Integer, Integer> value) { - i.addAndGet(value.first + 17 * value.second); + i.addAndGet(value.getFirst() + 17 * value.getSecond()); } }); - dut.right.onNext(testRight); - dut.left.onNext(testLeft); + dut.getRight().onNext(testRight); + dut.getLeft().onNext(testLeft); Assert.assertEquals(expected, i.get()); } @@ -93,15 +93,15 @@ public class MergingEventHandlerTest { new MergingEventHandler<>(new EventHandler<Pair<Integer, Integer>>() { @Override public void onNext(final Pair<Integer, Integer> value) { - i.addAndGet(value.first + 31 * value.second); + i.addAndGet(value.getFirst() + 31 * value.getSecond()); } }); - dut.left.onNext(testLeft1); - dut.right.onNext(testRight1); + dut.getLeft().onNext(testLeft1); + dut.getRight().onNext(testRight1); - dut.left.onNext(testLeft2); - dut.right.onNext(testRight2); + dut.getLeft().onNext(testLeft2); + dut.getRight().onNext(testRight2); Assert.assertEquals(expected1 + expected2, i.get()); } @@ -122,7 +122,7 @@ public class MergingEventHandlerTest { new MergingEventHandler<>(new EventHandler<Pair<Integer, Integer>>() { @Override public void onNext(final Pair<Integer, Integer> value) { - i.addAndGet(value.first + 31 * value.second); + i.addAndGet(value.getFirst() + 31 * value.getSecond()); } }); @@ -132,16 +132,16 @@ public class MergingEventHandlerTest { pool.submit(new Runnable() { @Override public void run() { - dut.left.onNext(testLeft1); - dut.right.onNext(testRight2); + dut.getLeft().onNext(testLeft1); + dut.getRight().onNext(testRight2); } }); pool.submit(new Runnable() { @Override public void run() { - dut.right.onNext(testRight1); - dut.left.onNext(testLeft2); + dut.getRight().onNext(testRight1); + dut.getLeft().onNext(testLeft2); } }); @@ -189,7 +189,7 @@ public class MergingEventHandlerTest { @Override public void run() { for (int kk = 0; kk < eventsPerLeft; ++kk) { - dut.left.onNext(kk); + dut.getLeft().onNext(kk); } } }); @@ -200,7 +200,7 @@ public class MergingEventHandlerTest { @Override public void run() { for (int kk = 0; kk < eventsPerRight; ++kk) { - dut.right.onNext(kk); + dut.getRight().onNext(kk); } } }); @@ -229,8 +229,8 @@ public class MergingEventHandlerTest { } }); - dut.left.onNext(true); - dut.right.onNext(104.0); + dut.getLeft().onNext(true); + dut.getRight().onNext(104.0); Assert.assertEquals(1, i.get()); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java index df654a2..1007215 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java @@ -160,8 +160,8 @@ public final class HttpServerReefEventHandler implements HttpHandler { final ArrayList<String> exits = LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.EXIT_PREFIX, logLevelPrefix, LoggingScopeImpl.DURATION); - final ArrayList<String> startsStages = LogParser.findStages(starts, LogParser.startIndicators); - final ArrayList<String> endStages = LogParser.findStages(exits, LogParser.endIndicators); + final ArrayList<String> startsStages = LogParser.findStages(starts, LogParser.START_INDICATORS); + final ArrayList<String> endStages = LogParser.findStages(exits, LogParser.END_INDICATORS); final ArrayList<String> result = LogParser.mergeStages(startsStages, endStages); writeLines(response, result, "Current Stages..."); break; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestAvroSerializerForHttp.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestAvroSerializerForHttp.java b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestAvroSerializerForHttp.java index f0adce3..a31d413 100644 --- a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestAvroSerializerForHttp.java +++ b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestAvroSerializerForHttp.java @@ -117,8 +117,8 @@ public class TestAvroSerializerForHttp { } static class EvaluatorDescriptorMock implements EvaluatorDescriptor { - final NodeDescriptor nodeDescriptor; - final EvaluatorProcessFactory evaluatorProcessFactory; + private final NodeDescriptor nodeDescriptor; + private final EvaluatorProcessFactory evaluatorProcessFactory; @Inject EvaluatorDescriptorMock(final NodeDescriptor nodeDescriptor, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c9e1bf9b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestReefEventStateManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestReefEventStateManager.java b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestReefEventStateManager.java index 3b16df0..9ef49ce 100644 --- a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestReefEventStateManager.java +++ b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestReefEventStateManager.java @@ -95,7 +95,7 @@ public class TestReefEventStateManager { final class MockEvaluatorDescriptor implements EvaluatorDescriptor { private final NodeDescriptor nodeDescriptor; - final EvaluatorProcessFactory evaluatorProcessFactory; + private final EvaluatorProcessFactory evaluatorProcessFactory; @Inject public MockEvaluatorDescriptor(final NodeDescriptor nodeDescriptor,
