http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingEventHandlerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingEventHandlerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingEventHandlerTest.java index 4ff123b..0117e76 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingEventHandlerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingEventHandlerTest.java @@ -34,10 +34,10 @@ public class BlockingEventHandlerTest { final AtomicInteger i = new AtomicInteger(0); final AtomicInteger cnt = new AtomicInteger(0); - BlockingEventHandler<Integer> h = new BlockingEventHandler<>(1, new EventHandler<Iterable<Integer>>() { + final BlockingEventHandler<Integer> h = new BlockingEventHandler<>(1, new EventHandler<Iterable<Integer>>() { @Override - public void onNext(Iterable<Integer> value) { - for (int x : value) { + public void onNext(final Iterable<Integer> value) { + for (final int x : value) { i.getAndAdd(x); cnt.incrementAndGet(); } @@ -53,10 +53,10 @@ public class BlockingEventHandlerTest { final AtomicInteger i = new AtomicInteger(0); final AtomicInteger cnt = new AtomicInteger(0); - BlockingEventHandler<Integer> h = new BlockingEventHandler<>(1, new EventHandler<Iterable<Integer>>() { + final BlockingEventHandler<Integer> h = new BlockingEventHandler<>(1, new EventHandler<Iterable<Integer>>() { @Override - public void onNext(Iterable<Integer> value) { - for (int x : value) { + public void onNext(final Iterable<Integer> value) { + for (final int x : value) { i.getAndAdd(x); cnt.incrementAndGet(); } @@ -77,15 +77,15 @@ public class BlockingEventHandlerTest { final int num = 1000; final BlockingEventHandler<Integer> h = new BlockingEventHandler<>(2 * num, new EventHandler<Iterable<Integer>>() { @Override - public void onNext(Iterable<Integer> value) { - for (int x : value) { + public void onNext(final Iterable<Integer> value) { + for (final int x : value) { i.getAndAdd(x); cnt.incrementAndGet(); } } }); - Runnable r = new Runnable() { + final Runnable r = new Runnable() { @Override public void run() { for (int i = 0; i < num; i++) { @@ -93,14 +93,14 @@ public class BlockingEventHandlerTest { } } }; - Thread a = new Thread(r); - Thread b = new Thread(r); + final Thread a = new Thread(r); + final Thread b = new Thread(r); a.start(); b.start(); try { a.join(); b.join(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { fail(e.toString()); } @@ -118,8 +118,8 @@ public class BlockingEventHandlerTest { final int events = 10; final BlockingEventHandler<Integer> h = new BlockingEventHandler<>(2 * num, new EventHandler<Iterable<Integer>>() { @Override - public void onNext(Iterable<Integer> value) { - for (int x : value) { + public void onNext(final Iterable<Integer> value) { + for (final int x : value) { i.getAndAdd(x); cnt.incrementAndGet(); } @@ -127,7 +127,7 @@ public class BlockingEventHandlerTest { } }); - Runnable r = new Runnable() { + final Runnable r = new Runnable() { @Override public void run() { for (int i = 0; i < num * events; i++) { @@ -135,14 +135,14 @@ public class BlockingEventHandlerTest { } } }; - Thread a = new Thread(r); - Thread b = new Thread(r); + final Thread a = new Thread(r); + final Thread b = new Thread(r); a.start(); b.start(); try { a.join(); b.join(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { fail(e.toString()); } @@ -160,8 +160,8 @@ public class BlockingEventHandlerTest { final int num = 1000; final BlockingEventHandler<Integer> h = new BlockingEventHandler<>(num, new EventHandler<Iterable<Integer>>() { @Override - public void onNext(Iterable<Integer> value) { - for (int x : value) { + public void onNext(final Iterable<Integer> value) { + for (final int x : value) { i.getAndAdd(x); cnt.incrementAndGet(); } @@ -169,25 +169,25 @@ public class BlockingEventHandlerTest { }); final int val = 7; - Runnable r = new Runnable() { + final Runnable r = new Runnable() { @Override public void run() { h.onNext(val); } }; - Thread[] workers = new Thread[num]; + final Thread[] workers = new Thread[num]; for (int ii = 0; ii < workers.length; ii++) { workers[ii] = new Thread(r); } - for (Thread w : workers) { + for (final Thread w : workers) { w.start(); } try { - for (Thread w : workers) { + for (final Thread w : workers) { w.join(); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { fail(e.toString()); }
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingSignalEventHandlerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingSignalEventHandlerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingSignalEventHandlerTest.java index 99f0276..506cef2 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingSignalEventHandlerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/BlockingSignalEventHandlerTest.java @@ -33,9 +33,9 @@ public class BlockingSignalEventHandlerTest { public void testSingle() { final AtomicInteger cnt = new AtomicInteger(0); - BlockingSignalEventHandler<Integer> h = new BlockingSignalEventHandler<>(1, new EventHandler<Integer>() { + final BlockingSignalEventHandler<Integer> h = new BlockingSignalEventHandler<>(1, new EventHandler<Integer>() { @Override - public void onNext(Integer value) { + public void onNext(final Integer value) { cnt.incrementAndGet(); } }); @@ -47,9 +47,9 @@ public class BlockingSignalEventHandlerTest { public void testMultiple() { final AtomicInteger cnt = new AtomicInteger(0); - BlockingSignalEventHandler<Integer> h = new BlockingSignalEventHandler<>(2, new EventHandler<Integer>() { + final BlockingSignalEventHandler<Integer> h = new BlockingSignalEventHandler<>(2, new EventHandler<Integer>() { @Override - public void onNext(Integer value) { + public void onNext(final Integer value) { cnt.incrementAndGet(); } }); @@ -66,12 +66,12 @@ public class BlockingSignalEventHandlerTest { final BlockingSignalEventHandler<Integer> h = new BlockingSignalEventHandler<>(2 * num, new EventHandler<Integer>() { @Override - public void onNext(Integer value) { + public void onNext(final Integer value) { cnt.incrementAndGet(); } }); - Runnable r = new Runnable() { + final Runnable r = new Runnable() { @Override public void run() { for (int i = 0; i < num; i++) { @@ -79,14 +79,14 @@ public class BlockingSignalEventHandlerTest { } } }; - Thread a = new Thread(r); - Thread b = new Thread(r); + final Thread a = new Thread(r); + final Thread b = new Thread(r); a.start(); b.start(); try { a.join(); b.join(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { fail(e.toString()); } @@ -102,12 +102,12 @@ public class BlockingSignalEventHandlerTest { final BlockingSignalEventHandler<Integer> h = new BlockingSignalEventHandler<>(2 * num, new EventHandler<Integer>() { @Override - public void onNext(Integer value) { + public void onNext(final Integer value) { cnt.incrementAndGet(); } }); - Runnable r = new Runnable() { + final Runnable r = new Runnable() { @Override public void run() { for (int i = 0; i < num * events; i++) { @@ -115,14 +115,14 @@ public class BlockingSignalEventHandlerTest { } } }; - Thread a = new Thread(r); - Thread b = new Thread(r); + final Thread a = new Thread(r); + final Thread b = new Thread(r); a.start(); b.start(); try { a.join(); b.join(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { fail(e.toString()); } @@ -136,31 +136,31 @@ public class BlockingSignalEventHandlerTest { final int num = 1000; final BlockingSignalEventHandler<Integer> h = new BlockingSignalEventHandler<>(num, new EventHandler<Integer>() { @Override - public void onNext(Integer value) { + public void onNext(final Integer value) { cnt.incrementAndGet(); } }); final int val = 7; - Runnable r = new Runnable() { + final Runnable r = new Runnable() { @Override public void run() { h.onNext(val); } }; - Thread[] workers = new Thread[num]; + final Thread[] workers = new Thread[num]; for (int ii = 0; ii < workers.length; ii++) { workers[ii] = new Thread(r); } - for (Thread w : workers) { + for (final Thread w : workers) { w.start(); } try { - for (Thread w : workers) { + for (final Thread w : workers) { w.join(); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { fail(e.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ForkPoolStageTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ForkPoolStageTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ForkPoolStageTest.java index 74b8cec..e2e7fe3 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ForkPoolStageTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ForkPoolStageTest.java @@ -41,17 +41,17 @@ public class ForkPoolStageTest { public void testPoolStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); + final EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); - WakeSharedPool p = new WakeSharedPool(10); + final WakeSharedPool p = new WakeSharedPool(10); - EStage<TestEventA> stage = new ForkPoolStage<TestEventA>(eventHandler, p); + final EStage<TestEventA> stage = new ForkPoolStage<TestEventA>(eventHandler, p); for (int i = 0; i < 10; ++i) { - TestEventA a = new TestEventA(); + final TestEventA a = new TestEventA(); orgSet.add(a); stage.onNext(a); @@ -71,24 +71,24 @@ public class ForkPoolStageTest { public void testSharedPoolStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); + final EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); - WakeSharedPool p = new WakeSharedPool(10); + final WakeSharedPool p = new WakeSharedPool(10); - EStage<TestEventA> stage1 = new ForkPoolStage<TestEventA>(eventHandler, p); - EStage<TestEventA> stage2 = new ForkPoolStage<TestEventA>(eventHandler, p); + final EStage<TestEventA> stage1 = new ForkPoolStage<TestEventA>(eventHandler, p); + final EStage<TestEventA> stage2 = new ForkPoolStage<TestEventA>(eventHandler, p); for (int i = 0; i < 10; ++i) { - TestEventA a = new TestEventA(); + final TestEventA a = new TestEventA(); orgSet.add(a); stage1.onNext(a); } for (int i = 10; i < 20; ++i) { - TestEventA a = new TestEventA(); + final TestEventA a = new TestEventA(); orgSet.add(a); stage2.onNext(a); @@ -109,21 +109,21 @@ public class ForkPoolStageTest { public void testMultiSharedPoolStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Map<Class<? extends TestEvent>, EventHandler<? extends TestEvent>> map + final Map<Class<? extends TestEvent>, EventHandler<? extends TestEvent>> map = new HashMap<Class<? extends TestEvent>, EventHandler<? extends TestEvent>>(); map.put(TestEventA.class, new TestEventHandlerA(procSet)); map.put(TestEventB.class, new TestEventHandlerB(procSet)); - EventHandler<TestEvent> eventHandler = new MultiEventHandler<TestEvent>(map); + final EventHandler<TestEvent> eventHandler = new MultiEventHandler<TestEvent>(map); - WakeSharedPool p = new WakeSharedPool(10); - EStage<TestEvent> stage = new ForkPoolStage<TestEvent>(eventHandler, p); + final WakeSharedPool p = new WakeSharedPool(10); + final EStage<TestEvent> stage = new ForkPoolStage<TestEvent>(eventHandler, p); for (int i = 0; i < 10; ++i) { - TestEventA a = new TestEventA(); - TestEventB b = new TestEventB(); + final TestEventA a = new TestEventA(); + final TestEventB b = new TestEventB(); orgSet.add(a); orgSet.add(b); @@ -144,11 +144,11 @@ public class ForkPoolStageTest { @Test public void testMeter() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - WakeSharedPool p = new WakeSharedPool(10); - EventHandler<TestEvent> eventHandler = new TestEventHandler(); - ForkPoolStage<TestEvent> stage = new ForkPoolStage<TestEvent>(eventHandler, p); + final WakeSharedPool p = new WakeSharedPool(10); + final EventHandler<TestEvent> eventHandler = new TestEventHandler(); + final ForkPoolStage<TestEvent> stage = new ForkPoolStage<TestEvent>(eventHandler, p); - TestEvent e = new TestEvent(); + final TestEvent e = new TestEvent(); for (int i = 0; i < 1000000; ++i) { stage.onNext(e); } @@ -161,12 +161,12 @@ public class ForkPoolStageTest { @Test public void testMeterTwoStages() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - WakeSharedPool p = new WakeSharedPool(10); - EventHandler<TestEvent> eventHandler = new TestEventHandler(); - ForkPoolStage<TestEvent> stage2 = new ForkPoolStage<TestEvent>(eventHandler, p); - ForkPoolStage<TestEvent> stage1 = new ForkPoolStage<TestEvent>(stage2, p); + final WakeSharedPool p = new WakeSharedPool(10); + final EventHandler<TestEvent> eventHandler = new TestEventHandler(); + final ForkPoolStage<TestEvent> stage2 = new ForkPoolStage<TestEvent>(eventHandler, p); + final ForkPoolStage<TestEvent> stage1 = new ForkPoolStage<TestEvent>(stage2, p); - TestEvent e = new TestEvent(); + final TestEvent e = new TestEvent(); for (int i = 0; i < 1000000; ++i) { stage1.onNext(e); } @@ -195,7 +195,7 @@ public class ForkPoolStageTest { TestEventHandler() { } - public void onNext(TestEvent e) { + public void onNext(final TestEvent e) { // no op } } @@ -203,11 +203,11 @@ public class ForkPoolStageTest { class TestEventHandlerA implements EventHandler<TestEventA> { private final Set<TestEvent> set; - TestEventHandlerA(Set<TestEvent> set) { + TestEventHandlerA(final Set<TestEvent> set) { this.set = set; } - public void onNext(TestEventA e) { + public void onNext(final TestEventA e) { set.add(e); System.out.println("TestEventHandlerA " + e); } @@ -216,11 +216,11 @@ public class ForkPoolStageTest { class TestEventHandlerB implements EventHandler<TestEventB> { private final Set<TestEvent> set; - TestEventHandlerB(Set<TestEvent> set) { + TestEventHandlerB(final Set<TestEvent> set) { this.set = set; } - public void onNext(TestEventB e) { + public void onNext(final TestEventB e) { set.add(e); System.out.println("TestEventHandlerB " + e); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/IndependentIterationsThreadPoolStageTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/IndependentIterationsThreadPoolStageTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/IndependentIterationsThreadPoolStageTest.java index 2d4d8bd..77c44f6 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/IndependentIterationsThreadPoolStageTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/IndependentIterationsThreadPoolStageTest.java @@ -36,19 +36,19 @@ public class IndependentIterationsThreadPoolStageTest { public void testOneIteration() { final AtomicInteger x = new AtomicInteger(0); final int val = 101; - IndependentIterationsThreadPoolStage<Integer> dut = new IndependentIterationsThreadPoolStage<>( + final IndependentIterationsThreadPoolStage<Integer> dut = new IndependentIterationsThreadPoolStage<>( new EventHandler<Integer>() { @Override - public void onNext(Integer value) { + public void onNext(final Integer value) { x.addAndGet(value); } }, 1, 1); - List<Integer> ll = new ArrayList<>(); + final List<Integer> ll = new ArrayList<>(); ll.add(val); dut.onNext(ll); try { dut.close(); - } catch (Exception e) { + } catch (final Exception e) { fail(e.toString()); } assertEquals(val, x.get()); @@ -63,10 +63,10 @@ public class IndependentIterationsThreadPoolStageTest { ll.add(i); } - IndependentIterationsThreadPoolStage<Integer> dut = new IndependentIterationsThreadPoolStage<>( + final IndependentIterationsThreadPoolStage<Integer> dut = new IndependentIterationsThreadPoolStage<>( new EventHandler<Integer>() { @Override - public void onNext(Integer value) { + public void onNext(final Integer value) { Logger.getAnonymousLogger().info("Yow" + value); x.addAndGet(value); } @@ -76,7 +76,7 @@ public class IndependentIterationsThreadPoolStageTest { try { dut.close(); - } catch (Exception e) { + } catch (final Exception e) { fail(e.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MetricsTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MetricsTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MetricsTest.java index f996192..e23703d 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MetricsTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/MetricsTest.java @@ -37,8 +37,8 @@ public class MetricsTest { public void testHistogram() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Histogram histogram = new UniformHistogram(10, 100); - Random rand = new Random(1); + final Histogram histogram = new UniformHistogram(10, 100); + final Random rand = new Random(1); for (int i = 0; i < 1000000; ++i) { long value = rand.nextLong() % 1000; value = value >= 0 ? value : (-1) * value; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/PubSubThreadPoolStageTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/PubSubThreadPoolStageTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/PubSubThreadPoolStageTest.java index 2836811..b401989 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/PubSubThreadPoolStageTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/PubSubThreadPoolStageTest.java @@ -45,21 +45,21 @@ public class PubSubThreadPoolStageTest { public void testPubSubThreadPoolStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Monitor monitor = new Monitor(); - TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 5000, 5000); + final Monitor monitor = new Monitor(); + final TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 5000, 5000); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - int expected = 10; + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final int expected = 10; - PubSubEventHandler<TestEvent> handler = new PubSubEventHandler<TestEvent>(); + final PubSubEventHandler<TestEvent> handler = new PubSubEventHandler<TestEvent>(); handler.subscribe(TestEvent.class, new TestEventHandler("Handler1", monitor, procSet, expected)); handler.subscribe(TestEvent.class, new TestEventHandler("Handler2", monitor, procSet, expected)); - EStage<TestEvent> stage = new ThreadPoolStage<TestEvent>(handler, 10); + final EStage<TestEvent> stage = new ThreadPoolStage<TestEvent>(handler, 10); for (int i = 0; i < expected; ++i) { - TestEvent a = new TestEvent("aaa"); + final TestEvent a = new TestEvent("aaa"); orgSet.add(a); stage.onNext(a); @@ -80,7 +80,7 @@ public class PubSubThreadPoolStageTest { class TestEvent { private final String msg; - public TestEvent(String msg) { + public TestEvent(final String msg) { this.msg = msg; } @@ -96,7 +96,7 @@ public class PubSubThreadPoolStageTest { private final Set<TestEvent> set; private final int expected; - TestEventHandler(String name, Monitor monitor, Set<TestEvent> set, int expected) { + TestEventHandler(final String name, final Monitor monitor, final Set<TestEvent> set, final int expected) { this.name = name; this.monitor = monitor; this.set = set; @@ -104,7 +104,7 @@ public class PubSubThreadPoolStageTest { } @Override - public void onNext(TestEvent e) { + public void onNext(final TestEvent e) { set.add(e); System.out.println(name + " " + e + " " + e.getMsg()); if (set.size() == expected) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/StageManagerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/StageManagerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/StageManagerTest.java index dc2a000..4094a1c 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/StageManagerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/StageManagerTest.java @@ -40,7 +40,7 @@ public class StageManagerTest { class TestEventHandler implements EventHandler<Void> { @Override - public void onNext(Void value) { + public void onNext(final Void value) { } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/SyncStageTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/SyncStageTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/SyncStageTest.java index 552b0ab..a46f6a6 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/SyncStageTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/SyncStageTest.java @@ -40,13 +40,13 @@ public class SyncStageTest { public void testSyncStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - EStage<TestEventA> stage = new SyncStage<TestEventA>(new TestEventHandlerA(procSet)); + final EStage<TestEventA> stage = new SyncStage<TestEventA>(new TestEventHandlerA(procSet)); for (int i = 0; i < 10; ++i) { - TestEventA a = new TestEventA(); + final TestEventA a = new TestEventA(); orgSet.add(a); stage.onNext(a); @@ -61,21 +61,21 @@ public class SyncStageTest { public void testMultiSyncStage() throws Exception { System.out.println(name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Map<Class<? extends TestEvent>, EventHandler<? extends TestEvent>> map + final Map<Class<? extends TestEvent>, EventHandler<? extends TestEvent>> map = new HashMap<Class<? extends TestEvent>, EventHandler<? extends TestEvent>>(); map.put(TestEventA.class, new TestEventHandlerA(procSet)); map.put(TestEventB.class, new TestEventHandlerB(procSet)); - EventHandler<TestEvent> eventHandler = new MultiEventHandler<TestEvent>(map); + final EventHandler<TestEvent> eventHandler = new MultiEventHandler<TestEvent>(map); - EStage<TestEvent> stage = new SyncStage<TestEvent>(eventHandler); + final EStage<TestEvent> stage = new SyncStage<TestEvent>(eventHandler); for (int i = 0; i < 10; ++i) { - TestEventA a = new TestEventA(); - TestEventB b = new TestEventB(); + final TestEventA a = new TestEventA(); + final TestEventB b = new TestEventB(); orgSet.add(a); orgSet.add(b); @@ -103,11 +103,11 @@ public class SyncStageTest { private final Set<TestEvent> set; - TestEventHandlerA(Set<TestEvent> set) { + TestEventHandlerA(final Set<TestEvent> set) { this.set = set; } - public void onNext(TestEventA e) { + public void onNext(final TestEventA e) { set.add(e); System.out.println("TestEventHandlerA " + e); } @@ -117,11 +117,11 @@ public class SyncStageTest { private final Set<TestEvent> set; - TestEventHandlerB(Set<TestEvent> set) { + TestEventHandlerB(final Set<TestEvent> set) { this.set = set; } - public void onNext(TestEventB e) { + public void onNext(final TestEventB e) { set.add(e); System.out.println("TestEventHandlerB " + e); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ThreadPoolStageTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ThreadPoolStageTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ThreadPoolStageTest.java index d4a5048..59730ff 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ThreadPoolStageTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/ThreadPoolStageTest.java @@ -41,14 +41,14 @@ public class ThreadPoolStageTest { public void testSingleThreadStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); - EStage<TestEventA> stage = new SingleThreadStage<TestEventA>(eventHandler, 20); + final EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); + final EStage<TestEventA> stage = new SingleThreadStage<TestEventA>(eventHandler, 20); for (int i = 0; i < 10; ++i) { - TestEventA a = new TestEventA(); + final TestEventA a = new TestEventA(); orgSet.add(a); stage.onNext(a); @@ -67,12 +67,12 @@ public class ThreadPoolStageTest { public void testSingleThreadStageQueueFull() { System.out.println(LOG_PREFIX + name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); final EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); final EStage<TestEventA> stage = new SingleThreadStage<TestEventA>(eventHandler, 1); for (int i = 0; i < 10000; ++i) { - TestEventA a = new TestEventA(); + final TestEventA a = new TestEventA(); stage.onNext(a); } @@ -83,15 +83,15 @@ public class ThreadPoolStageTest { public void testThreadPoolStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); + final EventHandler<TestEventA> eventHandler = new TestEventHandlerA(procSet); - EStage<TestEventA> stage = new ThreadPoolStage<TestEventA>(eventHandler, 10); + final EStage<TestEventA> stage = new ThreadPoolStage<TestEventA>(eventHandler, 10); for (int i = 0; i < 10; ++i) { - TestEventA a = new TestEventA(); + final TestEventA a = new TestEventA(); orgSet.add(a); stage.onNext(a); @@ -110,20 +110,20 @@ public class ThreadPoolStageTest { public void testMultiThreadPoolStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> procSet = Collections.synchronizedSet(new HashSet<TestEvent>()); + final Set<TestEvent> orgSet = Collections.synchronizedSet(new HashSet<TestEvent>()); - Map<Class<? extends TestEvent>, EventHandler<? extends TestEvent>> map + final Map<Class<? extends TestEvent>, EventHandler<? extends TestEvent>> map = new HashMap<Class<? extends TestEvent>, EventHandler<? extends TestEvent>>(); map.put(TestEventA.class, new TestEventHandlerA(procSet)); map.put(TestEventB.class, new TestEventHandlerB(procSet)); - EventHandler<TestEvent> eventHandler = new MultiEventHandler<TestEvent>(map); + final EventHandler<TestEvent> eventHandler = new MultiEventHandler<TestEvent>(map); - EStage<TestEvent> stage = new ThreadPoolStage<TestEvent>(eventHandler, 10); + final EStage<TestEvent> stage = new ThreadPoolStage<TestEvent>(eventHandler, 10); for (int i = 0; i < 10; ++i) { - TestEventA a = new TestEventA(); - TestEventB b = new TestEventB(); + final TestEventA a = new TestEventA(); + final TestEventB b = new TestEventB(); orgSet.add(a); orgSet.add(b); @@ -143,10 +143,10 @@ public class ThreadPoolStageTest { @Test public void testMeter() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - EventHandler<TestEvent> eventHandler = new TestEventHandler(); - ThreadPoolStage<TestEvent> stage = new ThreadPoolStage<TestEvent>(eventHandler, 10); + final EventHandler<TestEvent> eventHandler = new TestEventHandler(); + final ThreadPoolStage<TestEvent> stage = new ThreadPoolStage<TestEvent>(eventHandler, 10); - TestEvent e = new TestEvent(); + final TestEvent e = new TestEvent(); for (int i = 0; i < 1000000; ++i) { stage.onNext(e); } @@ -159,11 +159,11 @@ public class ThreadPoolStageTest { @Test public void testMeterTwoStages() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - EventHandler<TestEvent> eventHandler = new TestEventHandler(); - ThreadPoolStage<TestEvent> stage2 = new ThreadPoolStage<TestEvent>(eventHandler, 5); - ThreadPoolStage<TestEvent> stage1 = new ThreadPoolStage<TestEvent>(stage2, 5); + final EventHandler<TestEvent> eventHandler = new TestEventHandler(); + final ThreadPoolStage<TestEvent> stage2 = new ThreadPoolStage<TestEvent>(eventHandler, 5); + final ThreadPoolStage<TestEvent> stage1 = new ThreadPoolStage<TestEvent>(stage2, 5); - TestEvent e = new TestEvent(); + final TestEvent e = new TestEvent(); for (int i = 0; i < 1000000; ++i) { stage1.onNext(e); } @@ -190,7 +190,7 @@ public class ThreadPoolStageTest { TestEventHandler() { } - public void onNext(TestEvent e) { + public void onNext(final TestEvent e) { // no op } } @@ -198,11 +198,11 @@ public class ThreadPoolStageTest { class TestEventHandlerA implements EventHandler<TestEventA> { private final Set<TestEvent> set; - TestEventHandlerA(Set<TestEvent> set) { + TestEventHandlerA(final Set<TestEvent> set) { this.set = set; } - public void onNext(TestEventA e) { + public void onNext(final TestEventA e) { set.add(e); System.out.println("TestEventHandlerA " + e); } @@ -211,11 +211,11 @@ public class ThreadPoolStageTest { class TestEventHandlerB implements EventHandler<TestEventB> { private final Set<TestEvent> set; - TestEventHandlerB(Set<TestEvent> set) { + TestEventHandlerB(final Set<TestEvent> set) { this.set = set; } - public void onNext(TestEventB e) { + public void onNext(final TestEventB e) { set.add(e); System.out.println("TestEventHandlerB " + e); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/TimerStageTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/TimerStageTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/TimerStageTest.java index c25f6e8..5cb73b0 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/TimerStageTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/TimerStageTest.java @@ -42,11 +42,11 @@ public class TimerStageTest { public void testTimerStage() throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); - Monitor monitor = new Monitor(); - int expected = 10; + final Monitor monitor = new Monitor(); + final int expected = 10; - TestEventHandler handler = new TestEventHandler(monitor, expected); - Stage stage = new TimerStage(handler, 100, SHUTDOWN_TIMEOUT); + final TestEventHandler handler = new TestEventHandler(monitor, expected); + final Stage stage = new TimerStage(handler, 100, SHUTDOWN_TIMEOUT); monitor.mwait(); @@ -61,12 +61,12 @@ public class TimerStageTest { private final int expected; private AtomicInteger count = new AtomicInteger(0); - TestEventHandler(Monitor monitor, int expected) { + TestEventHandler(final Monitor monitor, final int expected) { this.monitor = monitor; this.expected = expected; } - public void onNext(PeriodicEvent e) { + public void onNext(final PeriodicEvent e) { count.incrementAndGet(); System.out.println(count.get() + " " + e + " scheduled event at " + System.currentTimeMillis()); if (count.get() == expected) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/SkipListTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/SkipListTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/SkipListTest.java index c7b41c1..c21fed6 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/SkipListTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/SkipListTest.java @@ -25,8 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger; public class SkipListTest { - public static void main(String[] arg) { - SkipListTest t = new SkipListTest(); + public static void main(final String[] arg) { + final SkipListTest t = new SkipListTest(); t.testPoll(); t.testHigher(); @@ -43,22 +43,22 @@ public class SkipListTest { public void testPoll() { System.out.println("poll"); final int unique = 2000000; - ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); - long instart = System.currentTimeMillis(); + final ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); + final long instart = System.currentTimeMillis(); for (int i = 0; i < unique; i++) { x.put(i, i); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); while (x.pollFirstEntry() != null) { // } - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + unique + " events in " + inelapsed + " seconds (" + ((double) unique) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + unique + " events in " + outelapsed @@ -69,40 +69,40 @@ public class SkipListTest { public void testHigher() { System.out.println("higher"); final int unique = 2000000; - ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); - long instart = System.currentTimeMillis(); + final ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); + final long instart = System.currentTimeMillis(); for (int i = 0; i < unique; i++) { x.put(i, i); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); System.gc(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); Integer k = x.pollFirstEntry().getKey(); while ((k = x.higherKey(k)) != null) { // } - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + unique + " events in " + inelapsed + " seconds (" + ((double) unique) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + unique + " events in " + outelapsed + " seconds (" + ((double) unique) / (outelapsed * 1000.0 * 1000.0) + " million events/sec)"); } - public boolean nThreads(int n, Runnable r, long timeout, TimeUnit t) { - ExecutorService e = Executors.newCachedThreadPool(); + public boolean nThreads(final int n, final Runnable r, final long timeout, final TimeUnit t) { + final ExecutorService e = Executors.newCachedThreadPool(); for (int i = 0; i < n; i++) { e.submit(r); } e.shutdown(); try { return e.awaitTermination(timeout, t); - } catch (InterruptedException e1) { + } catch (final InterruptedException e1) { e1.printStackTrace(); return false; } @@ -112,26 +112,26 @@ public class SkipListTest { public void testHigherRemove() { System.out.println("higher/remove"); final int unique = 2000000; - ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); - long instart = System.currentTimeMillis(); + final ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); + final long instart = System.currentTimeMillis(); for (int i = 0; i < unique; i++) { x.put(i, i); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); System.gc(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); Integer k = x.pollFirstEntry().getKey(); x.remove(k); while ((k = x.higherKey(k)) != null) { x.remove(k); } - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + unique + " events in " + inelapsed + " seconds (" + ((double) unique) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + unique + " events in " + outelapsed @@ -144,15 +144,15 @@ public class SkipListTest { System.out.println("higher/remove " + numOutW); final int unique = 2000000; final ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); - long instart = System.currentTimeMillis(); + final long instart = System.currentTimeMillis(); for (int i = 0; i < unique; i++) { x.put(i, i); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); System.gc(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); Assert.assertTrue(nThreads(numOutW, new Runnable() { @Override @@ -164,11 +164,11 @@ public class SkipListTest { } } }, 30, TimeUnit.SECONDS)); - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + unique + " events in " + inelapsed + " seconds (" + ((double) unique) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + unique + " events in " + outelapsed @@ -181,22 +181,22 @@ public class SkipListTest { System.out.println("higher/remove seek " + numOutW); final int unique = 2000000; final ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); - long instart = System.currentTimeMillis(); + final long instart = System.currentTimeMillis(); for (int i = 0; i < unique; i++) { x.put(i, i); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); System.gc(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); final AtomicInteger uid = new AtomicInteger(0); final int blockSize = unique / numOutW; Assert.assertTrue(nThreads(numOutW, new Runnable() { @Override public void run() { - int id = uid.getAndIncrement(); + final int id = uid.getAndIncrement(); final Integer startK = x.ceilingKey(blockSize * id); Integer k = startK; x.remove(k); @@ -205,11 +205,11 @@ public class SkipListTest { } } }, 30, TimeUnit.SECONDS)); - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + unique + " events in " + inelapsed + " seconds (" + ((double) unique) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + unique + " events in " + outelapsed @@ -223,22 +223,22 @@ public class SkipListTest { System.out.println("higher/remove seek " + numOutW); final int unique = 2000000; final ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); - long instart = System.currentTimeMillis(); + final long instart = System.currentTimeMillis(); for (int i = 0; i < unique; i++) { x.put(i, i); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); System.gc(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); final AtomicInteger uid = new AtomicInteger(0); final int blockSize = unique / numOutW; Assert.assertTrue(nThreads(numOutW, new Runnable() { @Override public void run() { - int id = uid.getAndIncrement(); + final int id = uid.getAndIncrement(); final Integer startK = x.ceilingKey(blockSize * id); final Integer endK = x.ceilingKey(blockSize * (id + 1)); Integer k = startK; @@ -248,11 +248,11 @@ public class SkipListTest { } while (k != null && k < endK); } }, 30, TimeUnit.SECONDS)); - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + unique + " events in " + inelapsed + " seconds (" + ((double) unique) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + unique + " events in " + outelapsed @@ -266,23 +266,23 @@ public class SkipListTest { System.out.println("higher/remove view " + numOutW); final int unique = 2000000; final ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); - long instart = System.currentTimeMillis(); + final long instart = System.currentTimeMillis(); for (int i = 0; i < unique; i++) { x.put(i, i); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); System.gc(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); final AtomicInteger uid = new AtomicInteger(0); final int blockSize = unique / numOutW; Assert.assertTrue(nThreads(numOutW, new Runnable() { @Override public void run() { - int id = uid.getAndIncrement(); - ConcurrentNavigableMap<Integer, Integer> myView = x.tailMap(blockSize * id); + final int id = uid.getAndIncrement(); + final ConcurrentNavigableMap<Integer, Integer> myView = x.tailMap(blockSize * id); final Integer endK = x.ceilingKey(blockSize * (id + 1)); Integer k = myView.pollFirstEntry().getKey(); do { @@ -294,11 +294,11 @@ public class SkipListTest { } while (true); } }, 30, TimeUnit.SECONDS)); - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + unique + " events in " + inelapsed + " seconds (" + ((double) unique) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + unique + " events in " + outelapsed @@ -313,7 +313,7 @@ public class SkipListTest { System.out.println("separate maps " + numOutW); final int unique = 2000000 / numOutW; final ConcurrentSkipListMap<Integer, Integer>[] x = new ConcurrentSkipListMap[numOutW]; - long instart = System.currentTimeMillis(); + final long instart = System.currentTimeMillis(); { final AtomicInteger uid = new AtomicInteger(0); nThreads(numOutW, new Runnable() { @@ -330,18 +330,18 @@ public class SkipListTest { } , 10, TimeUnit.SECONDS); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); System.gc(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); final AtomicInteger uid = new AtomicInteger(0); Assert.assertTrue(nThreads(numOutW, new Runnable() { @Override public void run() { - int id = uid.getAndIncrement(); - ConcurrentSkipListMap<Integer, Integer> mm = x[id]; + final int id = uid.getAndIncrement(); + final ConcurrentSkipListMap<Integer, Integer> mm = x[id]; final Integer startK = mm.pollFirstEntry().getKey(); Integer k = startK; mm.remove(k); @@ -350,11 +350,11 @@ public class SkipListTest { } } }, 30, TimeUnit.SECONDS)); - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); final int total = unique * numOutW; - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + total + " events in " + inelapsed + " seconds (" + ((double) total) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + total + " events in " + outelapsed @@ -368,15 +368,15 @@ public class SkipListTest { System.out.println("poll " + numOutW); final int unique = 2000000; final ConcurrentSkipListMap<Integer, Integer> x = new ConcurrentSkipListMap<>(); - long instart = System.currentTimeMillis(); + final long instart = System.currentTimeMillis(); for (int i = 0; i < unique; i++) { x.put(i, i); } - long inend = System.currentTimeMillis(); + final long inend = System.currentTimeMillis(); System.gc(); - long outstart = System.currentTimeMillis(); + final long outstart = System.currentTimeMillis(); Assert.assertTrue(nThreads(numOutW, new Runnable() { @Override @@ -388,11 +388,11 @@ public class SkipListTest { } } }, 30, TimeUnit.SECONDS)); - long outend = System.currentTimeMillis(); + final long outend = System.currentTimeMillis(); - double inelapsed = ((double) (inend - instart)) / 1000.0; - double outelapsed = ((double) (outend - outstart)) / 1000.0; + final double inelapsed = ((double) (inend - instart)) / 1000.0; + final double outelapsed = ((double) (outend - outstart)) / 1000.0; System.out.println("insert " + unique + " events in " + inelapsed + " seconds (" + ((double) unique) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("output " + unique + " events in " + outelapsed http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestBlockingJoin.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestBlockingJoin.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestBlockingJoin.java index 47b4487..dce77c8 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestBlockingJoin.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestBlockingJoin.java @@ -28,10 +28,10 @@ import org.junit.Test; public class TestBlockingJoin { @Test public void testJoin() throws Exception { - EventPrinter<TupleEvent> printer = new EventPrinter<>(); - BlockingJoin join = new BlockingJoin(printer); - TupleSource left = new TupleSource(join.wireLeft(), 256, 8, true); - TupleSource right = new TupleSource(join.wireRight(), 256, 8, false); + final EventPrinter<TupleEvent> printer = new EventPrinter<>(); + final BlockingJoin join = new BlockingJoin(printer); + final TupleSource left = new TupleSource(join.wireLeft(), 256, 8, true); + final TupleSource right = new TupleSource(join.wireRight(), 256, 8, false); left.close(); right.close(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestCombiner.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestCombiner.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestCombiner.java index 1717521..dc82307 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestCombiner.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestCombiner.java @@ -40,11 +40,11 @@ public class TestCombiner { @Test public void test() throws Exception { - CombinerStage<Integer, Integer> stage = new CombinerStage<Integer, Integer>( + final CombinerStage<Integer, Integer> stage = new CombinerStage<Integer, Integer>( new Combiner<Integer, Integer>() { @Override - public Integer combine(Integer key, Integer old, Integer cur) { + public Integer combine(final Integer key, final Integer old, final Integer cur) { if (old != null) { return old.intValue() + cur.intValue(); } else { @@ -56,20 +56,20 @@ public class TestCombiner { private AtomicInteger x = new AtomicInteger(0); @Override - public void onNext(Entry<Integer, Integer> value) { + public void onNext(final Entry<Integer, Integer> value) { System.out.println(value.getKey() + "=" + value.getValue()); x.incrementAndGet(); try { if (!done) { Thread.sleep(10); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IllegalStateException(e); } } @Override - public void onError(Exception error) { + public void onError(final Exception error) { System.err.println("onError called!"); error.printStackTrace(); } @@ -83,12 +83,12 @@ public class TestCombiner { o = stage.wireIn(); - WorkerThread[] workers = new WorkerThread[THREAD_COUNT]; + final WorkerThread[] workers = new WorkerThread[THREAD_COUNT]; for (int i = 0; i < THREAD_COUNT; i++) { workers[i] = new WorkerThread(); } - long start = System.currentTimeMillis(); + final long start = System.currentTimeMillis(); for (int i = 0; i < THREAD_COUNT; i++) { workers[i].start(); } @@ -97,15 +97,15 @@ public class TestCombiner { } o.onCompleted(); - long inStop = System.currentTimeMillis(); + final long inStop = System.currentTimeMillis(); done = true; stage.close(); - long outStop = System.currentTimeMillis(); + final long outStop = System.currentTimeMillis(); - long eventCount = THREAD_COUNT * EVENTS_PER_THREAD; - double inelapsed = ((double) (inStop - start)) / 1000.0; - double inoutelapsed = ((double) (outStop - start)) / 1000.0; + final long eventCount = THREAD_COUNT * EVENTS_PER_THREAD; + final double inelapsed = ((double) (inStop - start)) / 1000.0; + final double inoutelapsed = ((double) (outStop - start)) / 1000.0; System.out.println("Emitted " + eventCount + " events in " + inelapsed + " seconds (" + ((double) eventCount) / (inelapsed * 1000.0 * 1000.0) + " million events/sec)"); System.out.println("Emitted/output " + eventCount + " events in " + inoutelapsed @@ -116,9 +116,9 @@ public class TestCombiner { private class WorkerThread extends Thread { @Override public void run() { - Random rand = new Random(); + final Random rand = new Random(); for (int i = 0; i < EVENTS_PER_THREAD; i++) { - int r = rand.nextInt(BUCKET_COUNT); + final int r = rand.nextInt(BUCKET_COUNT); o.onNext(new CombinerStage.Pair<Integer, Integer>(r, 1)); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestJoin.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestJoin.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestJoin.java index 29288c1..78a64a2 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestJoin.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestJoin.java @@ -28,10 +28,10 @@ import org.junit.Test; public class TestJoin { @Test public void testJoin() throws Exception { - EventPrinter<TupleEvent> printer = new EventPrinter<>(); - NonBlockingJoin join = new NonBlockingJoin(printer); - TupleSource left = new TupleSource(join.wireLeft(), 256, 8, true); - TupleSource right = new TupleSource(join.wireRight(), 256, 8, false); + final EventPrinter<TupleEvent> printer = new EventPrinter<>(); + final NonBlockingJoin join = new NonBlockingJoin(printer); + final TupleSource left = new TupleSource(join.wireLeft(), 256, 8, true); + final TupleSource right = new TupleSource(join.wireRight(), 256, 8, false); left.close(); right.close(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestTupleSource.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestTupleSource.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestTupleSource.java index ecd069c..2193cd2 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestTupleSource.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/examples/TestTupleSource.java @@ -27,8 +27,8 @@ import org.junit.Test; public class TestTupleSource { @Test public void testOneThread() throws Exception { - EventPrinter<TupleEvent> printer = new EventPrinter<>(); - TupleSource source = new TupleSource(printer, 256, 8, true); + final EventPrinter<TupleEvent> printer = new EventPrinter<>(); + final TupleSource source = new TupleSource(printer, 256, 8, true); source.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java index 2fc962d..a78e964 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java @@ -79,24 +79,24 @@ public class LargeMsgTest { @Test public void testLargeWrite() throws Exception { LoggingUtils.setLoggingLevel(Level.FINE); - Monitor monitor = new Monitor(); - TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 20000, 20000); + final Monitor monitor = new Monitor(); + final TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 20000, 20000); - long dataSize = VALUES[0].length + VALUES[1].length + VALUES[2].length; + final long dataSize = VALUES[0].length + VALUES[1].length + VALUES[2].length; - EStage<TransportEvent> clientStage = new ThreadPoolStage<>("client1", + final EStage<TransportEvent> clientStage = new ThreadPoolStage<>("client1", new LoggingEventHandler<TransportEvent>(), 1, new LoggingEventHandler<Throwable>()); - EStage<TransportEvent> serverStage = new ThreadPoolStage<>("server@7001", + final EStage<TransportEvent> serverStage = new ThreadPoolStage<>("server@7001", new ServerHandler(monitor, dataSize), 1, new LoggingEventHandler<Throwable>()); final String hostAddress = this.localAddressProvider.getLocalAddress(); - int port = 7001; - Transport transport = tpFactory.newInstance(hostAddress, port, clientStage, serverStage, 1, 10000); + final int port = 7001; + final Transport transport = tpFactory.newInstance(hostAddress, port, clientStage, serverStage, 1, 10000); final Link<byte[]> link = transport.open(new InetSocketAddress(hostAddress, port), new PassThroughEncoder(), null); - EStage<byte[]> writeSubmitter = new ThreadPoolStage<>("Submitter", new EventHandler<byte[]>() { + final EStage<byte[]> writeSubmitter = new ThreadPoolStage<>("Submitter", new EventHandler<byte[]>() { @Override - public void onNext(byte[] value) { + public void onNext(final byte[] value) { link.write(value); } }, 3, new LoggingEventHandler<Throwable>()); @@ -118,15 +118,15 @@ public class LargeMsgTest { private final long expectedSize; private long accSize; - ServerHandler(Monitor monitor, long expectedSize) { + ServerHandler(final Monitor monitor, final long expectedSize) { this.monitor = monitor; this.expectedSize = expectedSize; this.accSize = 0; } @Override - public void onNext(TransportEvent value) { - byte[] data = value.getData(); + public void onNext(final TransportEvent value) { + final byte[] data = value.getData(); switch (data.length) { case L_0: http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java index 0b52ae8..7a004fe 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java @@ -46,12 +46,12 @@ public class RemoteIdentifierFactoryTest { public void testRemoteIdentifierFactory() { System.out.println(LOG_PREFIX + name.getMethodName()); - Map<String, Class<? extends Identifier>> typeToIdMap = new HashMap<String, Class<? extends Identifier>>(); + final Map<String, Class<? extends Identifier>> typeToIdMap = new HashMap<String, Class<? extends Identifier>>(); typeToIdMap.put("test", TestRemoteIdentifier.class); - IdentifierFactory factory = new DefaultIdentifierFactory(typeToIdMap); + final IdentifierFactory factory = new DefaultIdentifierFactory(typeToIdMap); - String idName = "test://name"; - Identifier id = factory.getNewInstance(idName); + final String idName = "test://name"; + final Identifier id = factory.getNewInstance(idName); System.out.println(id.toString()); Assert.assertTrue(id instanceof TestRemoteIdentifier); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java index e81b211..768fccc 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java @@ -71,15 +71,15 @@ public class RemoteManagerTest { System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); - Monitor monitor = new Monitor(); - TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); + final Monitor monitor = new Monitor(); + final TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); - Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); + final Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); clazzToCodecMap.put(StartEvent.class, new ObjectSerializableCodec<StartEvent>()); clazzToCodecMap.put(TestEvent.class, new ObjectSerializableCodec<TestEvent>()); clazzToCodecMap.put(TestEvent1.class, new ObjectSerializableCodec<TestEvent1>()); clazzToCodecMap.put(TestEvent2.class, new ObjectSerializableCodec<TestEvent2>()); - Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); + final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); final String hostAddress = localAddressProvider.getLocalAddress(); @@ -87,16 +87,16 @@ public class RemoteManagerTest { "name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, localAddressProvider, RangeTcpPortProvider.Default); - RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); - RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); + final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); + final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); Assert.assertTrue(rm.getMyIdentifier().equals(remoteId)); - EventHandler<StartEvent> proxyConnection = rm.getHandler(remoteId, StartEvent.class); - EventHandler<TestEvent1> proxyHandler1 = rm.getHandler(remoteId, TestEvent1.class); - EventHandler<TestEvent2> proxyHandler2 = rm.getHandler(remoteId, TestEvent2.class); + final EventHandler<StartEvent> proxyConnection = rm.getHandler(remoteId, StartEvent.class); + final EventHandler<TestEvent1> proxyHandler1 = rm.getHandler(remoteId, TestEvent1.class); + final EventHandler<TestEvent2> proxyHandler2 = rm.getHandler(remoteId, TestEvent2.class); - AtomicInteger counter = new AtomicInteger(0); - int finalSize = 2; + final AtomicInteger counter = new AtomicInteger(0); + final int finalSize = 2; rm.registerHandler(StartEvent.class, new MessageTypeEventHandler<StartEvent>(rm, monitor, counter, finalSize)); proxyConnection.onNext(new StartEvent()); @@ -115,19 +115,19 @@ public class RemoteManagerTest { @Test public void testRemoteManagerConnectionRetryTest() throws Exception { - ExecutorService smExecutor = Executors.newFixedThreadPool(1); - ExecutorService rmExecutor = Executors.newFixedThreadPool(1); + final ExecutorService smExecutor = Executors.newFixedThreadPool(1); + final ExecutorService rmExecutor = Executors.newFixedThreadPool(1); - RemoteManager sendingManager = getTestRemoteManager("sender", 9020, 3, 2000); + final RemoteManager sendingManager = getTestRemoteManager("sender", 9020, 3, 2000); - Future<Integer> smFuture = smExecutor.submit(new SendingRemoteManagerThread(sendingManager, 9010, 20000)); + final Future<Integer> smFuture = smExecutor.submit(new SendingRemoteManagerThread(sendingManager, 9010, 20000)); Thread.sleep(1000); - RemoteManager receivingManager = getTestRemoteManager("receiver", 9010, 1, 2000); - Future<Integer> rmFuture = rmExecutor.submit(new ReceivingRemoteManagerThread(receivingManager, 20000, 1, 2)); + final RemoteManager receivingManager = getTestRemoteManager("receiver", 9010, 1, 2000); + final Future<Integer> rmFuture = rmExecutor.submit(new ReceivingRemoteManagerThread(receivingManager, 20000, 1, 2)); - int smCnt = smFuture.get(); - int rmCnt = rmFuture.get(); + final int smCnt = smFuture.get(); + final int rmCnt = rmFuture.get(); receivingManager.close(); sendingManager.close(); @@ -138,12 +138,12 @@ public class RemoteManagerTest { @Test public void testRemoteManagerConnectionRetryWithMultipleSenderTest() throws Exception { - int numOfSenderThreads = 5; - ExecutorService smExecutor = Executors.newFixedThreadPool(numOfSenderThreads); - ExecutorService rmExecutor = Executors.newFixedThreadPool(1); - ArrayList<Future<Integer>> smFutures = new ArrayList<Future<Integer>>(numOfSenderThreads); + final int numOfSenderThreads = 5; + final ExecutorService smExecutor = Executors.newFixedThreadPool(numOfSenderThreads); + final ExecutorService rmExecutor = Executors.newFixedThreadPool(1); + final ArrayList<Future<Integer>> smFutures = new ArrayList<Future<Integer>>(numOfSenderThreads); - RemoteManager sendingManager = getTestRemoteManager("sender", 9030, 3, 5000); + final RemoteManager sendingManager = getTestRemoteManager("sender", 9030, 3, 5000); for (int i = 0; i < numOfSenderThreads; i++) { smFutures.add(smExecutor.submit(new SendingRemoteManagerThread(sendingManager, 9010, 20000))); @@ -151,17 +151,17 @@ public class RemoteManagerTest { Thread.sleep(2000); - RemoteManager receivingManager = getTestRemoteManager("receiver", 9010, 1, 2000); - Future<Integer> receivingFuture = + final RemoteManager receivingManager = getTestRemoteManager("receiver", 9010, 1, 2000); + final Future<Integer> receivingFuture = rmExecutor.submit(new ReceivingRemoteManagerThread(receivingManager, 20000, numOfSenderThreads, 2)); // waiting sending remote manager. - for (Future<Integer> future : smFutures) { + for (final Future<Integer> future : smFutures) { future.get(); } // waiting receiving remote manager - int rmCnt = receivingFuture.get(); + final int rmCnt = receivingFuture.get(); sendingManager.close(); receivingManager.close(); @@ -175,15 +175,15 @@ public class RemoteManagerTest { System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); - Monitor monitor = new Monitor(); - TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); + final Monitor monitor = new Monitor(); + final TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); - Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); + final Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); clazzToCodecMap.put(StartEvent.class, new ObjectSerializableCodec<StartEvent>()); clazzToCodecMap.put(TestEvent.class, new ObjectSerializableCodec<TestEvent>()); clazzToCodecMap.put(TestEvent1.class, new ObjectSerializableCodec<TestEvent1>()); clazzToCodecMap.put(TestEvent2.class, new ObjectSerializableCodec<TestEvent2>()); - Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); + final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); final String hostAddress = localAddressProvider.getLocalAddress(); @@ -191,15 +191,15 @@ public class RemoteManagerTest { "name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), true, 3, 10000, localAddressProvider, RangeTcpPortProvider.Default); - RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); - RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); + final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); + final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); - EventHandler<StartEvent> proxyConnection = rm.getHandler(remoteId, StartEvent.class); - EventHandler<TestEvent1> proxyHandler1 = rm.getHandler(remoteId, TestEvent1.class); - EventHandler<TestEvent2> proxyHandler2 = rm.getHandler(remoteId, TestEvent2.class); + final EventHandler<StartEvent> proxyConnection = rm.getHandler(remoteId, StartEvent.class); + final EventHandler<TestEvent1> proxyHandler1 = rm.getHandler(remoteId, TestEvent1.class); + final EventHandler<TestEvent2> proxyHandler2 = rm.getHandler(remoteId, TestEvent2.class); - AtomicInteger counter = new AtomicInteger(0); - int finalSize = 2; + final AtomicInteger counter = new AtomicInteger(0); + final int finalSize = 2; rm.registerHandler(StartEvent.class, new MessageTypeEventHandler<StartEvent>(rm, monitor, counter, finalSize)); proxyConnection.onNext(new StartEvent()); @@ -222,26 +222,26 @@ public class RemoteManagerTest { System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); - Monitor monitor = new Monitor(); - TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); + final Monitor monitor = new Monitor(); + final TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); - Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); + final Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); clazzToCodecMap.put(TestEvent.class, new TestEventCodec()); - Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); + final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - String hostAddress = localAddressProvider.getLocalAddress(); + final String hostAddress = localAddressProvider.getLocalAddress(); final RemoteManager rm = this.remoteManagerFactory.getInstance( "name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, localAddressProvider, RangeTcpPortProvider.Default); - RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); - RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); + final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); + final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); - EventHandler<TestEvent> proxyHandler = rm.getHandler(remoteId, TestEvent.class); + final EventHandler<TestEvent> proxyHandler = rm.getHandler(remoteId, TestEvent.class); - AtomicInteger counter = new AtomicInteger(0); - int finalSize = 0; + final AtomicInteger counter = new AtomicInteger(0); + final int finalSize = 0; rm.registerHandler(TestEvent.class, new MessageTypeEventHandler<TestEvent>(rm, monitor, counter, finalSize)); proxyHandler.onNext(new TestEvent("hello", 0.0)); @@ -259,44 +259,45 @@ public class RemoteManagerTest { System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); - Monitor monitor = new Monitor(); - TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); + final Monitor monitor = new Monitor(); + final TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); - Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); + final Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); clazzToCodecMap.put(StartEvent.class, new ObjectSerializableCodec<StartEvent>()); clazzToCodecMap.put(TestEvent.class, new ObjectSerializableCodec<TestEvent>()); - Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); + final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); final String hostAddress = localAddressProvider.getLocalAddress(); - ExceptionHandler errorHandler = new ExceptionHandler(monitor); + final ExceptionHandler errorHandler = new ExceptionHandler(monitor); try (final RemoteManager rm = remoteManagerFactory.getInstance("name", PORT, codec, errorHandler)) { - RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); - RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); + final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); + final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); - EventHandler<StartEvent> proxyConnection = rm.getHandler(remoteId, StartEvent.class); + final EventHandler<StartEvent> proxyConnection = rm.getHandler(remoteId, StartEvent.class); rm.registerHandler(StartEvent.class, new ExceptionGenEventHandler<StartEvent>("recvExceptionGen")); proxyConnection.onNext(new StartEvent()); monitor.mwait(); timer.close(); - } catch (UnknownHostException e) { + } catch (final UnknownHostException e) { e.printStackTrace(); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); } } - private RemoteManager getTestRemoteManager(String rmName, int localPort, int retry, int retryTimeout) { - Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); + private RemoteManager getTestRemoteManager(final String rmName, final int localPort, + final int retry, final int retryTimeout) { + final Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); clazzToCodecMap.put(StartEvent.class, new ObjectSerializableCodec<StartEvent>()); clazzToCodecMap.put(TestEvent1.class, new ObjectSerializableCodec<TestEvent1>()); clazzToCodecMap.put(TestEvent2.class, new ObjectSerializableCodec<TestEvent1>()); - Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); + final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - String hostAddress = localAddressProvider.getLocalAddress(); + final String hostAddress = localAddressProvider.getLocalAddress(); return remoteManagerFactory.getInstance(rmName, hostAddress, localPort, codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout, localAddressProvider, RangeTcpPortProvider.Default); @@ -308,7 +309,7 @@ public class RemoteManagerTest { private final int timeout; private RemoteManager rm; - public SendingRemoteManagerThread(RemoteManager rm, int remotePort, int timeout) { + public SendingRemoteManagerThread(final RemoteManager rm, final int remotePort, final int timeout) { this.remotePort = remotePort; this.timeout = timeout; this.rm = rm; @@ -317,19 +318,19 @@ public class RemoteManagerTest { @Override public Integer call() throws Exception { - Monitor monitor = new Monitor(); - TimerStage timer = new TimerStage(new TimeoutHandler(monitor), timeout, timeout); + final Monitor monitor = new Monitor(); + final TimerStage timer = new TimerStage(new TimeoutHandler(monitor), timeout, timeout); final String hostAddress = localAddressProvider.getLocalAddress(); - RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); - RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + remotePort); + final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); + final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + remotePort); - EventHandler<StartEvent> proxyConnection = rm.getHandler(remoteId, StartEvent.class); - EventHandler<TestEvent1> proxyHandler1 = rm.getHandler(remoteId, TestEvent1.class); - EventHandler<TestEvent2> proxyHandler2 = rm.getHandler(remoteId, TestEvent2.class); + final EventHandler<StartEvent> proxyConnection = rm.getHandler(remoteId, StartEvent.class); + final EventHandler<TestEvent1> proxyHandler1 = rm.getHandler(remoteId, TestEvent1.class); + final EventHandler<TestEvent2> proxyHandler2 = rm.getHandler(remoteId, TestEvent2.class); - AtomicInteger counter = new AtomicInteger(0); - int finalSize = 0; + final AtomicInteger counter = new AtomicInteger(0); + final int finalSize = 0; rm.registerHandler(StartEvent.class, new MessageTypeEventHandler<StartEvent>(rm, monitor, counter, finalSize)); proxyConnection.onNext(new StartEvent()); @@ -351,7 +352,8 @@ public class RemoteManagerTest { private final int numOfEvent; private RemoteManager rm; - public ReceivingRemoteManagerThread(RemoteManager rm, int timeout, int numOfConnection, int numOfEvent) { + public ReceivingRemoteManagerThread(final RemoteManager rm, final int timeout, + final int numOfConnection, final int numOfEvent) { this.rm = rm; this.timeout = timeout; this.numOfConnection = numOfConnection; @@ -361,11 +363,11 @@ public class RemoteManagerTest { @Override public Integer call() throws Exception { - Monitor monitor = new Monitor(); - TimerStage timer = new TimerStage(new TimeoutHandler(monitor), timeout, timeout); + final Monitor monitor = new Monitor(); + final TimerStage timer = new TimerStage(new TimeoutHandler(monitor), timeout, timeout); - AtomicInteger counter = new AtomicInteger(0); - int finalSize = numOfConnection * numOfEvent; + final AtomicInteger counter = new AtomicInteger(0); + final int finalSize = numOfConnection * numOfEvent; rm.registerHandler(StartEvent.class, new MessageTypeEventHandler<StartEvent>(rm, monitor, counter, finalSize)); for (int i = 0; i < numOfConnection; i++) { @@ -384,7 +386,8 @@ public class RemoteManagerTest { private final AtomicInteger counter; private final int finalSize; - MessageTypeEventHandler(RemoteManager rm, Monitor monitor, AtomicInteger counter, int finalSize) { + MessageTypeEventHandler(final RemoteManager rm, final Monitor monitor, + final AtomicInteger counter, final int finalSize) { this.rm = rm; this.monitor = monitor; this.counter = counter; @@ -392,10 +395,10 @@ public class RemoteManagerTest { } @Override - public void onNext(RemoteMessage<T> value) { + public void onNext(final RemoteMessage<T> value) { - RemoteIdentifier id = value.getIdentifier(); - T message = value.getMessage(); + final RemoteIdentifier id = value.getIdentifier(); + final T message = value.getMessage(); System.out.println(this.getClass() + " " + value + " " + id.toString() + " " + message.toString()); @@ -423,7 +426,7 @@ public class RemoteManagerTest { private final AtomicInteger counter; private final int finalSize; - ConsoleEventHandler(String name, Monitor monitor, AtomicInteger counter, int finalSize) { + ConsoleEventHandler(final String name, final Monitor monitor, final AtomicInteger counter, final int finalSize) { this.name = name; this.monitor = monitor; this.counter = counter; @@ -431,7 +434,7 @@ public class RemoteManagerTest { } @Override - public void onNext(T value) { + public void onNext(final T value) { System.out.println(this.getClass() + " " + name + " " + value); if (counter.incrementAndGet() == finalSize) { System.out.println(this.getClass() + " notify counter: " + counter.get()); @@ -449,7 +452,7 @@ public class RemoteManagerTest { } @Override - public void onNext(RemoteMessage<T> value) { + public void onNext(final RemoteMessage<T> value) { System.out.println(name + " " + value); throw new TestRuntimeException("Test exception"); } @@ -464,7 +467,7 @@ public class RemoteManagerTest { } @Override - public void onNext(Throwable value) { + public void onNext(final Throwable value) { System.out.println("!!! ExceptionHandler called : " + value); monitor.mnotify(); }