http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java b/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java index 59b2c4d..10e0f5e 100644 --- a/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java +++ b/library/src/test/java/com/datatorrent/lib/multiwindow/SortedMovingWindowTest.java @@ -23,14 +23,16 @@ import java.util.Comparator; import java.util.HashMap; import java.util.Map; -import org.apache.commons.lang.ObjectUtils.Null; import org.junit.Assert; import org.junit.Test; -import com.datatorrent.lib.testbench.CollectorTestSink; +import org.apache.commons.lang.ObjectUtils.Null; + import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.datatorrent.lib.testbench.CollectorTestSink; + /** * A unit test to test SortedMovingWindow operator can either: * 1. sort simple comparable tuples @@ -45,7 +47,8 @@ public class SortedMovingWindowTest * Test sorting simple comparable tuples within the sliding window */ @Test - public void testSortingSimpleNumberTuple(){ + public void testSortingSimpleNumberTuple() + { SortedMovingWindow<Integer, Null> smw = new SortedMovingWindow<Integer, Null>(); CollectorTestSink<Object> testSink = new CollectorTestSink<Object>(); smw.outputPort.setSink(testSink); @@ -73,9 +76,10 @@ public class SortedMovingWindowTest SortedMovingWindow<Map<String, Integer>, Null> smw = new SortedMovingWindow<Map<String, Integer>, Null>(); - final String[] keys = { "number" }; + final String[] keys = {"number"}; - smw.setComparator(new Comparator<Map<String, Integer>>() { + smw.setComparator(new Comparator<Map<String, Integer>>() + { @Override public int compare(Map<String, Integer> o1, Map<String, Integer> o2) { @@ -89,15 +93,17 @@ public class SortedMovingWindowTest smw.setWindowSize(2); // The incoming 6 simple map tuples are disordered among 4 windows - emitObjects(smw, new Map[][] { createHashMapTuples(keys, new Integer[][] { { 1 }, { 3 } }), createHashMapTuples(keys, new Integer[][] { { 2 }, { 5 } }), - createHashMapTuples(keys, new Integer[][] { { 4 } }), createHashMapTuples(keys, new Integer[][] { { 6 } }) }); + emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Integer[][]{{1}, {3}}), + createHashMapTuples(keys, new Integer[][]{{2}, {5}}), + createHashMapTuples(keys, new Integer[][]{{4}}), createHashMapTuples(keys, new Integer[][]{{6}})}); smw.beginWindow(4); smw.endWindow(); smw.beginWindow(5); smw.endWindow(); // The outcome is ordered by the value of the key "number" - Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Integer[][] { { 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 6 } })), testSink.collectedTuples); + Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Integer[][]{{1}, {2}, {3}, {4}, {5}, {6}})), + testSink.collectedTuples); } @@ -110,9 +116,10 @@ public class SortedMovingWindowTest SortedMovingWindow<Map<String, Object>, String> smw = new SortedMovingWindow<Map<String, Object>, String>(); - final String[] keys = { "name", "number" }; + final String[] keys = {"name", "number"}; - smw.setComparator(new Comparator<Map<String, Object>>() { + smw.setComparator(new Comparator<Map<String, Object>>() + { @Override public int compare(Map<String, Object> o1, Map<String, Object> o2) { @@ -121,12 +128,13 @@ public class SortedMovingWindowTest } }); - smw.setFunction(new Function<Map<String,Object>, String>() { + smw.setFunction(new Function<Map<String,Object>, String>() + { @Override public String apply(Map<String, Object> input) { // order tuple with same key "name" - return (String) input.get(keys[0]); + return (String)input.get(keys[0]); } }); CollectorTestSink<Object> testSink = new CollectorTestSink<Object>(); @@ -135,23 +143,23 @@ public class SortedMovingWindowTest smw.setWindowSize(2); // The incoming 9 complex map tuples are disordered with same name among 4 windows - emitObjects(smw, new Map[][] { createHashMapTuples(keys, new Object[][] { {"bob", 1 }, {"jim", 1 } }), createHashMapTuples(keys, new Object[][] { {"jim", 2 }, { "bob", 3 } }), - createHashMapTuples(keys, new Object[][] { { "bob", 2 }, { "jim", 4} }), createHashMapTuples(keys, new Object[][] { {"bob", 5}, {"jim", 3 }, {"bob", 4} }) }); + emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Object[][]{{"bob", 1}, {"jim", 1}}), + createHashMapTuples(keys, new Object[][]{{"jim", 2}, {"bob", 3}}), + createHashMapTuples(keys, new Object[][]{{"bob", 2}, {"jim", 4}}), + createHashMapTuples(keys, new Object[][]{{"bob", 5}, {"jim", 3}, {"bob", 4}})}); smw.beginWindow(4); smw.endWindow(); smw.beginWindow(5); smw.endWindow(); // All tuples with same "name" are sorted by key "number" - Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Object[][] { { "bob", 1 }, { "jim", 1 }, { "jim", 2 }, { "bob", 2 }, - { "bob", 3 }, { "jim", 3 }, { "jim", 4 }, { "bob", 4 }, { "bob", 5 } })), testSink.collectedTuples); + Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, + new Object[][]{{"bob", 1}, {"jim", 1}, {"jim", 2}, {"bob", 2}, {"bob", 3}, {"jim", 3}, {"jim", 4}, {"bob", 4}, {"bob", 5}})), testSink.collectedTuples); } - - - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private void emitObjects(SortedMovingWindow win, Object[][] obj){ + @SuppressWarnings({"rawtypes", "unchecked"}) + private void emitObjects(SortedMovingWindow win, Object[][] obj) + { for (int i = 0; i < obj.length; i++) { win.beginWindow(i); for (int j = 0; j < obj[i].length; j++) { @@ -161,8 +169,9 @@ public class SortedMovingWindowTest } } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private Map[] createHashMapTuples(String[] cols, Object[][] values){ + @SuppressWarnings({"rawtypes", "unchecked"}) + private Map[] createHashMapTuples(String[] cols, Object[][] values) + { HashMap[] maps = new HashMap[values.length]; int index = -1;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java index 3a6f7fe..8d52f7e 100644 --- a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java +++ b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessPartitionerTest.java @@ -21,20 +21,21 @@ package com.datatorrent.lib.partitioner; import java.util.Collection; import java.util.List; -import com.google.common.collect.Lists; - import org.junit.Assert; import org.junit.Test; -import com.datatorrent.api.*; +import com.google.common.collect.Lists; + import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.Partitioner; import com.datatorrent.api.Partitioner.Partition; import com.datatorrent.api.StringCodec.Object2String; - -import com.datatorrent.lib.util.TestUtils; - import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.util.TestUtils; public class StatelessPartitionerTest { @@ -101,7 +102,7 @@ public class StatelessPartitionerTest Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0)); Assert.assertEquals("Incorrect number of partitions", 1, newPartitions.size()); - for(Partition<DummyOperator> partition: newPartitions) { + for (Partition<DummyOperator> partition : newPartitions) { Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue()); } } @@ -119,7 +120,7 @@ public class StatelessPartitionerTest Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, new PartitioningContextImpl(null, 0)); Assert.assertEquals("Incorrect number of partitions", 5, newPartitions.size()); - for(Partition<DummyOperator> partition: newPartitions) { + for (Partition<DummyOperator> partition : newPartitions) { Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue()); } } @@ -149,7 +150,7 @@ public class StatelessPartitionerTest partitions.add(mockPartition); Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, - new PartitioningContextImpl(null, 5)); + new PartitioningContextImpl(null, 5)); Assert.assertEquals("after partition", 5, newPartitions.size()); } @@ -172,7 +173,7 @@ public class StatelessPartitionerTest } Collection<Partition<DummyOperator>> newPartitions = statelessPartitioner.definePartitions(partitions, - new PartitioningContextImpl(null, 1)); + new PartitioningContextImpl(null, 1)); Assert.assertEquals("after partition", 1, newPartitions.size()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java index 16647ab..6eaebc3 100644 --- a/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java +++ b/library/src/test/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitionerTest.java @@ -18,7 +18,6 @@ */ package com.datatorrent.lib.partitioner; -import com.datatorrent.api.*; import java.util.Collection; import java.util.List; import java.util.Map; @@ -29,6 +28,13 @@ import org.junit.Test; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.StatsListener; import com.datatorrent.lib.util.TestUtils; /** @@ -150,8 +156,8 @@ public class StatelessThroughputBasedPartitionerTest partitions.clear(); partitions.add(mockPartition); - Collection<Partitioner.Partition<DummyOperator>> newPartitions = statelessLatencyBasedPartitioner.definePartitions(partitions, - new StatelessPartitionerTest.PartitioningContextImpl(ports, 5)); + Collection<Partitioner.Partition<DummyOperator>> newPartitions = statelessLatencyBasedPartitioner.definePartitions( + partitions, new StatelessPartitionerTest.PartitioningContextImpl(ports, 5)); Assert.assertEquals("after partition", 2, newPartitions.size()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java b/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java index f3f69fb..3ad30df 100644 --- a/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/script/JavaScriptOperatorTest.java @@ -56,7 +56,7 @@ public class JavaScriptOperatorTest // Validate value. Assert.assertEquals("number emitted tuples", 1, sink.collectedTuples.size()); for (Object o : sink.collectedTuples) { // count is 12 - Assert.assertEquals("4.0 is expected", (Double) o, 4.0, 0); + Assert.assertEquals("4.0 is expected", (Double)o, 4.0, 0); } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java index 1cf89ff..47fa2c2 100644 --- a/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/statistics/MeridianOperatorTest.java @@ -18,12 +18,12 @@ */ package com.datatorrent.lib.statistics; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; - import org.junit.Assert; import org.junit.Test; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + /** * Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br> */ http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java index 158fed5..26e94c6 100644 --- a/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/statistics/ModeOperatorTest.java @@ -18,12 +18,12 @@ */ package com.datatorrent.lib.statistics; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; - import org.junit.Assert; import org.junit.Test; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + /** * Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br> */ http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java b/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java index e3788c2..f9589db 100644 --- a/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/statistics/WeightedMeanOperatorTest.java @@ -18,12 +18,12 @@ */ package com.datatorrent.lib.statistics; -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; - import org.junit.Assert; import org.junit.Test; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + /** * Functional Test for {@link com.datatorrent.lib.statistics.WeightedMeanOperator}. <br> */ http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java b/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java index 76983f5..13d6097 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/ArrayListAggregatorTest.java @@ -30,36 +30,35 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class ArrayListAggregatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - public void testNodeProcessing() throws Exception - { - ArrayListAggregator<Integer> oper = new ArrayListAggregator<Integer>(); - CollectorTestSink cSink = new CollectorTestSink(); + public void testNodeProcessing() throws Exception + { + ArrayListAggregator<Integer> oper = new ArrayListAggregator<Integer>(); + CollectorTestSink cSink = new CollectorTestSink(); - oper.output.setSink(cSink); - oper.setSize(10); - int numtuples = 100; + oper.output.setSink(cSink); + oper.setSize(10); + int numtuples = 100; - oper.beginWindow(0); - for (int i = 0; i < numtuples; i++) { - oper.input.process(i); - } - oper.endWindow(); - Assert.assertEquals("number emitted tuples", 10, - cSink.collectedTuples.size()); + oper.beginWindow(0); + for (int i = 0; i < numtuples; i++) { + oper.input.process(i); + } + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 10, + cSink.collectedTuples.size()); - cSink.clear(); - oper.setSize(0); + cSink.clear(); + oper.setSize(0); - oper.beginWindow(1); - for (int i = 0; i < numtuples; i++) { - oper.input.process(i); - } - oper.endWindow(); - Assert.assertEquals("number emitted tuples", 1, - cSink.collectedTuples.size()); - ArrayList<?> list = (ArrayList<?>) cSink.collectedTuples.get(0); - Assert.assertEquals("number emitted tuples", numtuples, list.size()); - } + oper.beginWindow(1); + for (int i = 0; i < numtuples; i++) { + oper.input.process(i); + } + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size()); + ArrayList<?> list = (ArrayList<?>)cSink.collectedTuples.get(0); + Assert.assertEquals("number emitted tuples", numtuples, list.size()); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java b/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java index fe0edc8..6f1504d 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/ArrayListToItemTest.java @@ -30,29 +30,29 @@ import com.datatorrent.lib.testbench.CountTestSink; * Benchmarks: Currently does about ?? Million tuples/sec in debugging environment. Need to test on larger nodes<br> * <br> */ -public class ArrayListToItemTest { - - /** - * Test operator pass through. The Object passed is not relevant - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testNodeProcessing() throws Exception - { - ArrayListToItem oper = new ArrayListToItem(); - CountTestSink itemSink = new CountTestSink(); - oper.item.setSink(itemSink); +public class ArrayListToItemTest +{ + /** + * Test operator pass through. The Object passed is not relevant + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testNodeProcessing() throws Exception + { + ArrayListToItem oper = new ArrayListToItem(); + CountTestSink itemSink = new CountTestSink(); + oper.item.setSink(itemSink); - oper.beginWindow(0); - ArrayList<String> input = new ArrayList<String>(); - input.add("a"); - // Same input object can be used as the oper is just pass through - int numtuples = 1000; - for (int i = 0; i < numtuples; i++) { - oper.data.process(input); - } - - oper.endWindow(); - Assert.assertEquals("number emitted tuples", numtuples, itemSink.count); + oper.beginWindow(0); + ArrayList<String> input = new ArrayList<String>(); + input.add("a"); + // Same input object can be used as the oper is just pass through + int numtuples = 1000; + for (int i = 0; i < numtuples; i++) { + oper.data.process(input); } + + oper.endWindow(); + Assert.assertEquals("number emitted tuples", numtuples, itemSink.count); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java b/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java index 697ae34..de419fd 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/ConsolidatorKeyValTest.java @@ -32,25 +32,25 @@ import com.datatorrent.lib.util.KeyValPair; */ public class ConsolidatorKeyValTest { - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testNodeProcessing() throws Exception - { - ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer> oper = - new ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer>(); - CollectorTestSink cSink = new CollectorTestSink(); - oper.out.setSink(cSink); + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testNodeProcessing() throws Exception + { + ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer> oper = + new ConsolidatorKeyVal<String, Integer, Double, Integer, Integer, Integer>(); + CollectorTestSink cSink = new CollectorTestSink(); + oper.out.setSink(cSink); - oper.beginWindow(0); - KeyValPair<String, Integer> m1 = new KeyValPair<String, Integer>("a",1); - oper.in1.process(m1); - KeyValPair<String, Double> m2 = new KeyValPair<String, Double>("a",1.0); - oper.in2.process(m2); - oper.endWindow(); - Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size()); + oper.beginWindow(0); + KeyValPair<String, Integer> m1 = new KeyValPair<String, Integer>("a",1); + oper.in1.process(m1); + KeyValPair<String, Double> m2 = new KeyValPair<String, Double>("a",1.0); + oper.in2.process(m2); + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 1, cSink.collectedTuples.size()); - HashMap<String, ArrayList<Object>> map = (HashMap<String, ArrayList<Object>>) cSink.collectedTuples.get(0); - Assert.assertEquals("size of sink map", 1, map.size()); - } + HashMap<String, ArrayList<Object>> map = (HashMap<String, ArrayList<Object>>)cSink.collectedTuples.get(0); + Assert.assertEquals("size of sink map", 1, map.size()); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java b/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java index c9837cf..dc73a75 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/CounterTest.java @@ -27,33 +27,34 @@ import com.datatorrent.lib.testbench.CountTestSink; * Functional test for {@link com.datatorrent.lib.stream.Counter}<p> * <br> */ -public class CounterTest { - - /** - * Test oper pass through. The Object passed is not relevant - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testNodeProcessing() throws Exception - { - Counter oper = new Counter(); - CountTestSink cSink = new CountTestSink(); - - oper.output.setSink(cSink); - int numtuples = 100; - - oper.beginWindow(0); - for (int i = 0; i < numtuples; i++) { - oper.input.process(i); - } - oper.endWindow(); - - oper.beginWindow(1); - for (int i = 0; i < numtuples; i++) { - oper.input.process(i); - } - oper.endWindow(); - - Assert.assertEquals("number emitted tuples", 2, cSink.getCount()); +public class CounterTest +{ + + /** + * Test oper pass through. The Object passed is not relevant + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testNodeProcessing() throws Exception + { + Counter oper = new Counter(); + CountTestSink cSink = new CountTestSink(); + + oper.output.setSink(cSink); + int numtuples = 100; + + oper.beginWindow(0); + for (int i = 0; i < numtuples; i++) { + oper.input.process(i); } + oper.endWindow(); + + oper.beginWindow(1); + for (int i = 0; i < numtuples; i++) { + oper.input.process(i); + } + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 2, cSink.getCount()); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java b/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java index 0cb5f42..6266787 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/DevNullCounterTest.java @@ -18,13 +18,12 @@ */ package com.datatorrent.lib.stream; -import com.datatorrent.lib.stream.DevNullCounter; -import com.datatorrent.lib.testbench.EventGenerator; - import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.lib.testbench.EventGenerator; + /** * * Functional tests for {@link com.datatorrent.lib.testbench.DevNullCounter}. @@ -41,29 +40,26 @@ import org.slf4j.LoggerFactory; public class DevNullCounterTest { - private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class); + private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class); - /** - * Tests both string and non string schema - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testSingleSchemaNodeProcessing() throws Exception - { - DevNullCounter oper = new DevNullCounter(); - oper.setRollingwindowcount(5); - oper.setup(null); + /** + * Tests both string and non string schema + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSingleSchemaNodeProcessing() throws Exception + { + DevNullCounter oper = new DevNullCounter(); + oper.setRollingwindowcount(5); + oper.setup(null); - oper.beginWindow(0); - long numtuples = 1000000; - Object o = new Object(); - for (long i = 0; i < numtuples; i++) { - oper.data.process(o); - } - oper.endWindow(); - LOG.info(String - .format( - "\n*******************************************************\nnumtuples(%d)", - numtuples)); - } + oper.beginWindow(0); + long numtuples = 1000000; + Object o = new Object(); + for (long i = 0; i < numtuples; i++) { + oper.data.process(o); + } + oper.endWindow(); + LOG.info(String.format("\n*******************************************************\nnumtuples(%d)", numtuples)); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java b/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java index 4695d69..3bd9c11 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/DevNullTest.java @@ -18,26 +18,25 @@ */ package com.datatorrent.lib.stream; -import com.datatorrent.lib.stream.DevNull; -import com.datatorrent.lib.testbench.EventGenerator; - import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.lib.testbench.EventGenerator; + /** * Functional tests for {@link com.datatorrent.lib.testbench.DevNull}. */ -public class DevNullTest { - - private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class); +public class DevNullTest +{ + private static Logger LOG = LoggerFactory.getLogger(EventGenerator.class); /** * Tests both string and non string schema */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test public void testSingleSchemaNodeProcessing() throws Exception { DevNull oper = new DevNull(); @@ -49,6 +48,6 @@ public class DevNullTest { oper.data.process(o); } oper.endWindow(); - LOG.info(String.format("\n*******************************************************\nnumtuples(%d)", numtuples)); + LOG.info(String.format("\n*******************************************************\nnumtuples(%d)", numtuples)); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java b/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java index 99305e3..314eb01 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/HashMapToKeyValPairTest.java @@ -31,35 +31,35 @@ import com.datatorrent.lib.testbench.CountTestSink; public class HashMapToKeyValPairTest { - /** - * Test oper pass through. The Object passed is not relevant - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testNodeProcessing() throws Exception - { - HashMapToKeyValPair oper = new HashMapToKeyValPair(); - CountTestSink keySink = new CountTestSink(); - CountTestSink valSink = new CountTestSink(); - CountTestSink keyvalSink = new CountTestSink(); + /** + * Test oper pass through. The Object passed is not relevant + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + HashMapToKeyValPair oper = new HashMapToKeyValPair(); + CountTestSink keySink = new CountTestSink(); + CountTestSink valSink = new CountTestSink(); + CountTestSink keyvalSink = new CountTestSink(); - oper.key.setSink(keySink); - oper.val.setSink(valSink); - oper.keyval.setSink(keyvalSink); + oper.key.setSink(keySink); + oper.val.setSink(valSink); + oper.keyval.setSink(keyvalSink); - oper.beginWindow(0); - HashMap<String, String> input = new HashMap<String, String>(); - input.put("a", "1"); - // Same input object can be used as the oper is just pass through - int numtuples = 1000; - for (int i = 0; i < numtuples; i++) { - oper.data.process(input); - } + oper.beginWindow(0); + HashMap<String, String> input = new HashMap<String, String>(); + input.put("a", "1"); + // Same input object can be used as the oper is just pass through + int numtuples = 1000; + for (int i = 0; i < numtuples; i++) { + oper.data.process(input); + } - oper.endWindow(); + oper.endWindow(); - Assert.assertEquals("number emitted tuples", numtuples, keySink.count); - Assert.assertEquals("number emitted tuples", numtuples, valSink.count); - Assert.assertEquals("number emitted tuples", numtuples, keyvalSink.count); - } + Assert.assertEquals("number emitted tuples", numtuples, keySink.count); + Assert.assertEquals("number emitted tuples", numtuples, valSink.count); + Assert.assertEquals("number emitted tuples", numtuples, keyvalSink.count); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java b/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java index a46eee7..e376c95 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/JsonByteArrayOperatorTest.java @@ -32,76 +32,75 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class JsonByteArrayOperatorTest { - /** - * Test json byte array to HashMap operator pass through. The Object passed is not relevant - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testOperator() throws Exception - { - JsonByteArrayOperator oper = new JsonByteArrayOperator(); - oper.setConcatenationCharacter('.'); - - CollectorTestSink mapSink = new CollectorTestSink(); - CollectorTestSink jsonObjectSink = new CollectorTestSink(); - CollectorTestSink flatMapSink = new CollectorTestSink(); - - oper.outputMap.setSink(mapSink); - oper.outputJsonObject.setSink(jsonObjectSink); - oper.outputFlatMap.setSink(flatMapSink); - - oper.beginWindow(0); - - // input test json string - String inputJson = " { \"@timestamp\":\"2013-09-25T19:37:23.569Z\"" - + " ,\"@version\":\"1\"" - + " ,\"type\":\"apache-logs\"" - + " ,\"host\":\"node1001\"" - + " ,\"clientip\":192.168.150.120" - + " ,\"verb\":\"GET\"" - + " ,\"request\":\"/reset.css\"" - + " ,\"httpversion\":\"1.1\"" - + " ,\"response\":200" - + " ,\"agentinfo\": {\"browser\":Firefox" - + " ,\"os\": { \"name\":\"Ubuntu\"" - + " ,\"version\":\"10.04\"" - + " }" - + " }" - + " ,\"bytes\":909.1" - + " }"; - - byte[] inputByteArray = inputJson.getBytes(); - - // run the operator for the same string 1000 times - int numtuples = 1000; - for (int i = 0; i < numtuples; i++) { - oper.input.process(inputByteArray); - } - - oper.endWindow(); - - // assert that the number of the operator generates is 1000 - Assert.assertEquals("number emitted tuples", numtuples, mapSink.collectedTuples.size()); - Assert.assertEquals("number emitted tuples", numtuples, jsonObjectSink.collectedTuples.size()); - Assert.assertEquals("number emitted tuples", numtuples, flatMapSink.collectedTuples.size()); - - // assert that value for one of the keys in any one of the objects from mapSink is as expected - Object map = mapSink.collectedTuples.get(510); - String expectedClientip = "192.168.150.120"; - Assert.assertEquals("emitted tuple", expectedClientip, ((Map)map).get("clientip")); - - - // assert that value for one of the keys in any one of the objects from jsonObjectSink is as expected - Object jsonObject = jsonObjectSink.collectedTuples.get(433); - Number expectedResponse = 200; - Assert.assertEquals("emitted tuple", expectedResponse, ((JSONObject)jsonObject).get("response")); - - // assert that value for one of the keys in any one of the objects from flatMapSink is as expected - Map flatMap = (Map)flatMapSink.collectedTuples.get(511); - String expectedBrowser = "Firefox"; - String expectedOsName = "Ubuntu"; - Assert.assertEquals("emitted tuple", expectedBrowser, flatMap.get("agentinfo.browser")); - Assert.assertEquals("emitted tuple", expectedOsName, flatMap.get("agentinfo.os.name")); + /** + * Test json byte array to HashMap operator pass through. The Object passed is not relevant + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testOperator() throws Exception + { + JsonByteArrayOperator oper = new JsonByteArrayOperator(); + oper.setConcatenationCharacter('.'); + + CollectorTestSink mapSink = new CollectorTestSink(); + CollectorTestSink jsonObjectSink = new CollectorTestSink(); + CollectorTestSink flatMapSink = new CollectorTestSink(); + + oper.outputMap.setSink(mapSink); + oper.outputJsonObject.setSink(jsonObjectSink); + oper.outputFlatMap.setSink(flatMapSink); + + oper.beginWindow(0); + + // input test json string + String inputJson = " { \"@timestamp\":\"2013-09-25T19:37:23.569Z\"" + + " ,\"@version\":\"1\"" + + " ,\"type\":\"apache-logs\"" + + " ,\"host\":\"node1001\"" + + " ,\"clientip\":192.168.150.120" + + " ,\"verb\":\"GET\"" + + " ,\"request\":\"/reset.css\"" + + " ,\"httpversion\":\"1.1\"" + + " ,\"response\":200" + + " ,\"agentinfo\": {\"browser\":Firefox" + + " ,\"os\": { \"name\":\"Ubuntu\"" + + " ,\"version\":\"10.04\"" + + " }" + + " }" + + " ,\"bytes\":909.1" + + " }"; + + byte[] inputByteArray = inputJson.getBytes(); + + // run the operator for the same string 1000 times + int numtuples = 1000; + for (int i = 0; i < numtuples; i++) { + oper.input.process(inputByteArray); } + oper.endWindow(); + + // assert that the number of the operator generates is 1000 + Assert.assertEquals("number emitted tuples", numtuples, mapSink.collectedTuples.size()); + Assert.assertEquals("number emitted tuples", numtuples, jsonObjectSink.collectedTuples.size()); + Assert.assertEquals("number emitted tuples", numtuples, flatMapSink.collectedTuples.size()); + + // assert that value for one of the keys in any one of the objects from mapSink is as expected + Object map = mapSink.collectedTuples.get(510); + String expectedClientip = "192.168.150.120"; + Assert.assertEquals("emitted tuple", expectedClientip, ((Map)map).get("clientip")); + + // assert that value for one of the keys in any one of the objects from jsonObjectSink is as expected + Object jsonObject = jsonObjectSink.collectedTuples.get(433); + Number expectedResponse = 200; + Assert.assertEquals("emitted tuple", expectedResponse, ((JSONObject)jsonObject).get("response")); + + // assert that value for one of the keys in any one of the objects from flatMapSink is as expected + Map flatMap = (Map)flatMapSink.collectedTuples.get(511); + String expectedBrowser = "Firefox"; + String expectedOsName = "Ubuntu"; + Assert.assertEquals("emitted tuple", expectedBrowser, flatMap.get("agentinfo.browser")); + Assert.assertEquals("emitted tuple", expectedOsName, flatMap.get("agentinfo.os.name")); + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java b/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java index 2b5a583..2d0595f 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/KeyPairToHashMapTest.java @@ -32,28 +32,28 @@ import com.datatorrent.lib.util.KeyValPair; public class KeyPairToHashMapTest { - /** - * Test oper pass through. The Object passed is not relevant - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testNodeProcessing() throws Exception - { - KeyValPairToHashMap oper = new KeyValPairToHashMap(); - CountTestSink mapSink = new CountTestSink(); - - oper.map.setSink(mapSink); - - oper.beginWindow(0); - KeyValPair<String, String> input = new KeyValPair<String, String>("a", "1"); - - // Same input object can be used as the oper is just pass through - int numtuples = 1000; - for (int i = 0; i < numtuples; i++) { - oper.keyval.process(input); - } - oper.endWindow(); - - Assert.assertEquals("number emitted tuples", numtuples, mapSink.count); - } + /** + * Test oper pass through. The Object passed is not relevant + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testNodeProcessing() throws Exception + { + KeyValPairToHashMap oper = new KeyValPairToHashMap(); + CountTestSink mapSink = new CountTestSink(); + + oper.map.setSink(mapSink); + + oper.beginWindow(0); + KeyValPair<String, String> input = new KeyValPair<String, String>("a", "1"); + + // Same input object can be used as the oper is just pass through + int numtuples = 1000; + for (int i = 0; i < numtuples; i++) { + oper.keyval.process(input); + } + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", numtuples, mapSink.count); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java b/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java index c26c8d0..d78e369 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/RoundRobinHashMapTest.java @@ -33,59 +33,59 @@ import com.datatorrent.lib.testbench.CollectorTestSink; public class RoundRobinHashMapTest { - private static Logger log = LoggerFactory - .getLogger(RoundRobinHashMapTest.class); + private static Logger log = LoggerFactory + .getLogger(RoundRobinHashMapTest.class); - /** - * Test operator pass through. The Object passed is not relevant - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testNodeProcessing() throws Exception - { - RoundRobinHashMap oper = new RoundRobinHashMap(); - CollectorTestSink mapSink = new CollectorTestSink(); + /** + * Test operator pass through. The Object passed is not relevant + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + RoundRobinHashMap oper = new RoundRobinHashMap(); + CollectorTestSink mapSink = new CollectorTestSink(); - String[] keys = new String[3]; - keys[0] = "a"; - keys[1] = "b"; - keys[2] = "c"; + String[] keys = new String[3]; + keys[0] = "a"; + keys[1] = "b"; + keys[2] = "c"; - oper.setKeys(keys); - oper.map.setSink(mapSink); - oper.beginWindow(0); + oper.setKeys(keys); + oper.map.setSink(mapSink); + oper.beginWindow(0); - HashMap<String, Integer> t1 = new HashMap<String, Integer>(); - t1.put("a", 0); - t1.put("b", 1); - t1.put("c", 2); - HashMap<String, Integer> t2 = new HashMap<String, Integer>(); - t2.put("a", 3); - t2.put("b", 4); - t2.put("c", 5); - HashMap<String, Integer> t3 = new HashMap<String, Integer>(); - t3.put("a", 6); - t3.put("b", 7); - t3.put("c", 8); + HashMap<String, Integer> t1 = new HashMap<String, Integer>(); + t1.put("a", 0); + t1.put("b", 1); + t1.put("c", 2); + HashMap<String, Integer> t2 = new HashMap<String, Integer>(); + t2.put("a", 3); + t2.put("b", 4); + t2.put("c", 5); + HashMap<String, Integer> t3 = new HashMap<String, Integer>(); + t3.put("a", 6); + t3.put("b", 7); + t3.put("c", 8); - HashMap<String, Integer> t4 = new HashMap<String, Integer>(); - t4.put("a", 9); - t4.put("b", 10); - t4.put("c", 11); + HashMap<String, Integer> t4 = new HashMap<String, Integer>(); + t4.put("a", 9); + t4.put("b", 10); + t4.put("c", 11); - // Same input object can be used as the oper is just pass through - int numtuples = 12; - for (int i = 0; i < numtuples; i++) { - oper.data.process(i); - } - oper.endWindow(); + // Same input object can be used as the oper is just pass through + int numtuples = 12; + for (int i = 0; i < numtuples; i++) { + oper.data.process(i); + } + oper.endWindow(); - Assert.assertEquals("number emitted tuples", numtuples / 3, - mapSink.collectedTuples.size()); - log.debug(mapSink.collectedTuples.toString()); - Assert.assertEquals("tuple 1", t1, mapSink.collectedTuples.get(0)); - Assert.assertEquals("tuple 2", t2, mapSink.collectedTuples.get(1)); - Assert.assertEquals("tuple 3", t3, mapSink.collectedTuples.get(2)); - Assert.assertEquals("tuple 4", t4, mapSink.collectedTuples.get(3)); - } + Assert.assertEquals("number emitted tuples", numtuples / 3, + mapSink.collectedTuples.size()); + log.debug(mapSink.collectedTuples.toString()); + Assert.assertEquals("tuple 1", t1, mapSink.collectedTuples.get(0)); + Assert.assertEquals("tuple 2", t2, mapSink.collectedTuples.get(1)); + Assert.assertEquals("tuple 3", t3, mapSink.collectedTuples.get(2)); + Assert.assertEquals("tuple 4", t4, mapSink.collectedTuples.get(3)); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java b/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java index 1c05b6c..7586c70 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/StreamDuplicaterTest.java @@ -28,34 +28,35 @@ import com.datatorrent.lib.testbench.CountTestSink; * Benchmarks: Currently does about ?? Million tuples/sec in debugging environment. Need to test on larger nodes<br> * <br> */ -public class StreamDuplicaterTest { - - /** - * Test oper pass through. The Object passed is not relevant - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testNodeProcessing() throws Exception - { - StreamDuplicater oper = new StreamDuplicater(); - CountTestSink mergeSink1 = new CountTestSink(); - CountTestSink mergeSink2 = new CountTestSink(); - - oper.out1.setSink(mergeSink1); - oper.out2.setSink(mergeSink2); - - oper.beginWindow(0); - int numtuples = 1000; - Integer input = new Integer(0); - // Same input object can be used as the oper is just pass through - for (int i = 0; i < numtuples; i++) { - oper.data.process(input); - } - - oper.endWindow(); - - // One for each key - Assert.assertEquals("number emitted tuples", numtuples, mergeSink1.count); - Assert.assertEquals("number emitted tuples", numtuples, mergeSink2.count); +public class StreamDuplicaterTest +{ + + /** + * Test oper pass through. The Object passed is not relevant + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testNodeProcessing() throws Exception + { + StreamDuplicater oper = new StreamDuplicater(); + CountTestSink mergeSink1 = new CountTestSink(); + CountTestSink mergeSink2 = new CountTestSink(); + + oper.out1.setSink(mergeSink1); + oper.out2.setSink(mergeSink2); + + oper.beginWindow(0); + int numtuples = 1000; + Integer input = 0; + // Same input object can be used as the oper is just pass through + for (int i = 0; i < numtuples; i++) { + oper.data.process(input); } + + oper.endWindow(); + + // One for each key + Assert.assertEquals("number emitted tuples", numtuples, mergeSink1.count); + Assert.assertEquals("number emitted tuples", numtuples, mergeSink2.count); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java b/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java index fe626f4..995dc6f 100644 --- a/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java +++ b/library/src/test/java/com/datatorrent/lib/stream/StreamMergerTest.java @@ -43,7 +43,7 @@ public class StreamMergerTest oper.beginWindow(0); int numtuples = 500; - Integer input = new Integer(0); + Integer input = 0; // Same input object can be used as the oper is just pass through for (int i = 0; i < numtuples; i++) { oper.data1.process(input); @@ -51,6 +51,6 @@ public class StreamMergerTest } oper.endWindow(); - Assert.assertEquals("number emitted tuples", numtuples*2, mergeSink.count); + Assert.assertEquals("number emitted tuples", numtuples * 2, mergeSink.count); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java index b0d3c01..1f29d1d 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java @@ -21,8 +21,9 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.datatorrent.lib.streamquery.DeleteOperator; import com.datatorrent.lib.streamquery.condition.EqualValueCondition; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -31,44 +32,46 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class DeleteOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlSelect() { - // create operator - DeleteOperator oper = new DeleteOperator(); - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + // create operator + DeleteOperator oper = new DeleteOperator(); + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(DeleteOperatorTest.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java index 97d587b..728fb96 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.lib.streamquery.condition.Condition; import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; @@ -29,16 +31,16 @@ import com.datatorrent.lib.testbench.CollectorTestSink; public class FullOuterJoinOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlSelect() { - // create operator - OuterJoinOperator oper = new OuterJoinOperator(); - oper.setFullJoin(true); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - + // create operator + OuterJoinOperator oper = new OuterJoinOperator(); + oper.setFullJoin(true); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + // set column join condition Condition cond = new JoinColumnEqualCondition("a", "a"); oper.setJoinCondition(cond); @@ -46,43 +48,46 @@ public class FullOuterJoinOperatorTest // add columns oper.selectTable1Column(new ColumnIndex("b", null)); oper.selectTable2Column(new ColumnIndex("c", null)); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 11); - tuple.put("c", 12); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport2.process(tuple); - - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 11); + tuple.put("c", 12); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(FullOuterJoinOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java index 0d4c939..714f93b 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.lib.streamquery.condition.EqualValueCondition; import com.datatorrent.lib.streamquery.function.SumFunction; @@ -32,58 +34,61 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class GroupByOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlGroupBy() { - // create operator - GroupByHavingOperator oper = new GroupByHavingOperator(); - oper.addColumnGroupByIndex(new ColumnIndex("b", null)); - try { + // create operator + GroupByHavingOperator oper = new GroupByHavingOperator(); + oper.addColumnGroupByIndex(new ColumnIndex("b", null)); + try { oper.addAggregateIndex(new SumFunction("c", null)); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); return; } - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 1); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 2); - tuple.put("c", 6); - oper.inport.process(tuple); - + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 2); + tuple.put("c", 6); + oper.inport.process(tuple); + tuple = new HashMap<String, Object>(); tuple.put("a", 1); tuple.put("b", 2); tuple.put("c", 7); oper.inport.process(tuple); - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(GroupByOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java index 7ccb2ed..e11723d 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.lib.streamquery.condition.EqualValueCondition; import com.datatorrent.lib.streamquery.condition.HavingCompareValue; @@ -35,57 +37,60 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class HavingOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlGroupBy() throws Exception { - // create operator - GroupByHavingOperator oper = new GroupByHavingOperator(); - oper.addColumnGroupByIndex(new ColumnIndex("b", null)); - FunctionIndex sum = new SumFunction("c", null); + // create operator + GroupByHavingOperator oper = new GroupByHavingOperator(); + oper.addColumnGroupByIndex(new ColumnIndex("b", null)); + FunctionIndex sum = new SumFunction("c", null); oper.addAggregateIndex(sum); // create having condition HavingCondition having = new HavingCompareValue<Double>(sum, 6.0, 0); oper.addHavingCondition(having); - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 1); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 2); - tuple.put("c", 6); - oper.inport.process(tuple); - + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 1); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 2); + tuple.put("c", 6); + oper.inport.process(tuple); + tuple = new HashMap<String, Object>(); tuple.put("a", 1); tuple.put("b", 2); tuple.put("c", 7); oper.inport.process(tuple); - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(HavingOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java index 2f14f16..8a022ee 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.lib.streamquery.condition.Condition; import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; @@ -34,53 +36,56 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class InnerJoinOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlSelect() { - // create operator - InnerJoinOperator oper = new InnerJoinOperator(); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - // set column join condition - Condition cond = new JoinColumnEqualCondition("a", "a"); - oper.setJoinCondition(cond); - - // add columns - oper.selectTable1Column(new ColumnIndex("b", null)); - oper.selectTable2Column(new ColumnIndex("c", null)); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport2.process(tuple); - - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + // create operator + InnerJoinOperator oper = new InnerJoinOperator(); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + // set column join condition + Condition cond = new JoinColumnEqualCondition("a", "a"); + oper.setJoinCondition(cond); + + // add columns + oper.selectTable1Column(new ColumnIndex("b", null)); + oper.selectTable2Column(new ColumnIndex("c", null)); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(InnerJoinOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java index 32e5b13..aa25e87 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.lib.streamquery.condition.Condition; import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; @@ -29,15 +31,15 @@ import com.datatorrent.lib.testbench.CollectorTestSink; public class LeftOuterJoinOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlSelect() { - // create operator - OuterJoinOperator oper = new OuterJoinOperator(); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - + // create operator + OuterJoinOperator oper = new OuterJoinOperator(); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + // set column join condition Condition cond = new JoinColumnEqualCondition("a", "a"); oper.setJoinCondition(cond); @@ -45,43 +47,46 @@ public class LeftOuterJoinOperatorTest // add columns oper.selectTable1Column(new ColumnIndex("b", null)); oper.selectTable2Column(new ColumnIndex("c", null)); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 11); - tuple.put("c", 12); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport2.process(tuple); - - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 11); + tuple.put("c", 12); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(LeftOuterJoinOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java index b233290..2d7ba87 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -29,60 +31,63 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class OrderByOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlSelect() { - // craete operator + // craete operator OrderByOperator oper = new OrderByOperator(); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - oper.addOrderByRule(new OrderByRule<Integer>("b")); - oper.setDescending(true); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("c", 2); - tuple.put("a", 0); - tuple.put("b", 1); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 2); - tuple.put("b", 6); - tuple.put("c", 6); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 4); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 8); - tuple.put("c", 4); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + oper.addOrderByRule(new OrderByRule<Integer>("b")); + oper.setDescending(true); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("c", 2); + tuple.put("a", 0); + tuple.put("b", 1); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 2); + tuple.put("b", 6); + tuple.put("c", 6); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 4); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 8); + tuple.put("c", 4); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(OrderByOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java index f99ee25..3a57427 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.lib.streamquery.condition.Condition; import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition; @@ -29,16 +31,16 @@ import com.datatorrent.lib.testbench.CollectorTestSink; public class RightOuterJoinOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlSelect() { - // create operator - OuterJoinOperator oper = new OuterJoinOperator(); - oper.setRighttJoin(); - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - + // create operator + OuterJoinOperator oper = new OuterJoinOperator(); + oper.setRighttJoin(); + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + // set column join condition Condition cond = new JoinColumnEqualCondition("a", "a"); oper.setJoinCondition(cond); @@ -46,44 +48,47 @@ public class RightOuterJoinOperatorTest // add columns oper.selectTable1Column(new ColumnIndex("b", null)); oper.selectTable2Column(new ColumnIndex("c", null)); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport1.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport1.process(tuple); - - - tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 7); - tuple.put("c", 8); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport2.process(tuple); - - tuple = new HashMap<String, Object>(); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport1.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport1.process(tuple); + + + tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 7); + tuple.put("c", 8); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport2.process(tuple); + + tuple = new HashMap<String, Object>(); tuple.put("a", 2); tuple.put("b", 11); tuple.put("c", 12); oper.inport2.process(tuple); - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(RightOuterJoinOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java index 3ac18f8..8e6620e 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java @@ -21,8 +21,9 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.datatorrent.lib.streamquery.SelectOperator; import com.datatorrent.lib.streamquery.condition.EqualValueCondition; import com.datatorrent.lib.streamquery.index.ColumnIndex; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -32,46 +33,49 @@ import com.datatorrent.lib.testbench.CollectorTestSink; */ public class SelectOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlSelect() { - // create operator - SelectOperator oper = new SelectOperator(); - oper.addIndex(new ColumnIndex("b", null)); - oper.addIndex(new ColumnIndex("c", null)); - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + // create operator + SelectOperator oper = new SelectOperator(); + oper.addIndex(new ColumnIndex("b", null)); + oper.addIndex(new ColumnIndex("c", null)); + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(SelectOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java index 8c894d1..c92c6c1 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java @@ -21,6 +21,8 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -54,7 +56,10 @@ public class SelectTopOperatorTest tuple.put("c", 6); oper.inport.process(tuple); oper.endWindow(); - - System.out.println(sink.collectedTuples.toString()); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(SelectTopOperatorTest.class); + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java index 70713db..42af56b 100644 --- a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java @@ -21,52 +21,56 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.datatorrent.lib.streamquery.UpdateOperator; import com.datatorrent.lib.streamquery.condition.EqualValueCondition; import com.datatorrent.lib.testbench.CollectorTestSink; public class UpdateOperatorTest { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test public void testSqlSelect() { - // create operator - UpdateOperator oper = new UpdateOperator(); - - EqualValueCondition condition = new EqualValueCondition(); - condition.addEqualValue("a", 1); - oper.setCondition(condition); - oper.addUpdate("c", 100); - - CollectorTestSink sink = new CollectorTestSink(); - oper.outport.setSink(sink); - - oper.setup(null); - oper.beginWindow(1); - - HashMap<String, Object> tuple = new HashMap<String, Object>(); - tuple.put("a", 0); - tuple.put("b", 1); - tuple.put("c", 2); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 3); - tuple.put("c", 4); - oper.inport.process(tuple); - - tuple = new HashMap<String, Object>(); - tuple.put("a", 1); - tuple.put("b", 5); - tuple.put("c", 6); - oper.inport.process(tuple); - - oper.endWindow(); - oper.teardown(); - - System.out.println(sink.collectedTuples.toString()); + // create operator + UpdateOperator oper = new UpdateOperator(); + + EqualValueCondition condition = new EqualValueCondition(); + condition.addEqualValue("a", 1); + oper.setCondition(condition); + oper.addUpdate("c", 100); + + CollectorTestSink sink = new CollectorTestSink(); + oper.outport.setSink(sink); + + oper.setup(null); + oper.beginWindow(1); + + HashMap<String, Object> tuple = new HashMap<String, Object>(); + tuple.put("a", 0); + tuple.put("b", 1); + tuple.put("c", 2); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 3); + tuple.put("c", 4); + oper.inport.process(tuple); + + tuple = new HashMap<String, Object>(); + tuple.put("a", 1); + tuple.put("b", 5); + tuple.put("c", 6); + oper.inport.process(tuple); + + oper.endWindow(); + oper.teardown(); + + LOG.debug("{}", sink.collectedTuples); } + + private static final Logger LOG = LoggerFactory.getLogger(UpdateOperatorTest.class); + }
