Repository: incubator-apex-malhar Updated Branches: refs/heads/master cf3bb7df0 -> ab76dacd3
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java new file mode 100644 index 0000000..b2d1e8b --- /dev/null +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; + +/** + * A test class which test your own stream implementation build on default one + */ +@SuppressWarnings("unchecked") +public class MyStreamTest +{ + static Map<Object, Integer> expected = new HashMap<>(); + static String testId = null; + static Callable<Boolean> exitCondition = null; + static { + expected.put("newword1", 4); + expected.put("newword2", 8); + expected.put("newword3", 4); + expected.put("newword4", 4); + expected.put("newword5", 4); + expected.put("newword7", 4); + expected.put("newword9", 6); + + exitCondition = new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId); + return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1)); + } + }; + } + + @Test + public void testMethodChainWordcount() throws Exception + { + + testId = "testMethodChainWordcount"; + + TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>(); + collector.id = testId; + new MyStream<>(StreamFactory.fromFolder("./src/test/resources/data")) + .<String, MyStream<String>>flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split(" ")); + } + }).myFilterAndMap(new Function.MapFunction<String, String>() + { + @Override + public String f(String input) + { + return input.replace("word", "newword"); + } + }, new Function.FilterFunction<String>() + { + @Override + public Boolean f(String input) + { + return input.startsWith("word"); + } + }).countByKey() + .addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition); + + + List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId); + Assert.assertTrue(data.size() > 1); + Assert.assertEquals(expected, data.get(data.size() - 1)); + } + + @Test + public void testNonMethodChainWordcount() throws Exception + { + testId = "testNonMethodChainWordcount"; + + TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>(); + collector.id = testId; + MyStream<String> mystream = new MyStream<>(StreamFactory + .fromFolder("./src/test/resources/data")) + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split(" ")); + } + }); + mystream.myFilterAndMap(new Function.MapFunction<String, String>() + { + @Override + public String f(String input) + { + return input.replace("word", "newword"); + } + }, new Function.FilterFunction<String>() + { + @Override + public Boolean f(String input) + { + return input.startsWith("word"); + } + }).countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition); + + + List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId); + Assert.assertTrue(data.size() > 1); + Assert.assertEquals(expected, data.get(data.size() - 1)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java new file mode 100644 index 0000000..a5c644d --- /dev/null +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * A helper class for assertion of results collected + */ +public class TupleCollector<T> extends BaseOperator +{ + + public static volatile Map<String, List<?>> results = new HashMap<>(); + + public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<>(this); + + public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>(); + + public String id = ""; + + public static class CollectorInputPort<T> extends DefaultInputPort<T> + { + TupleCollector ownerOperator; + + List<T> list; + + public CollectorInputPort(TupleCollector ownerOperator) + { + super(); + this.ownerOperator = ownerOperator; + } + + @Override + @SuppressWarnings("unchecked") + public void process(T tuple) + { + list.add(tuple); + ownerOperator.outputPort.emit(tuple); + } + + @Override + public void setConnected(boolean flag) + { + if (flag) { + results.put(ownerOperator.id, list = new LinkedList<>()); + } + } + } + + + + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/resources/data/word.txt ---------------------------------------------------------------------- diff --git a/stream/src/test/resources/data/word.txt b/stream/src/test/resources/data/word.txt new file mode 100644 index 0000000..7e28409 --- /dev/null +++ b/stream/src/test/resources/data/word.txt @@ -0,0 +1,2 @@ +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
