Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master cf3bb7df0 -> ab76dacd3


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
----------------------------------------------------------------------
diff --git 
a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java 
b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
new file mode 100644
index 0000000..b2d1e8b
--- /dev/null
+++ 
b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+/**
+ * A test class which test your own stream implementation build on default one
+ */
+@SuppressWarnings("unchecked")
+public class MyStreamTest
+{
+  static Map<Object, Integer> expected = new HashMap<>();
+  static String testId = null;
+  static Callable<Boolean> exitCondition = null;
+  static {
+    expected.put("newword1", 4);
+    expected.put("newword2", 8);
+    expected.put("newword3", 4);
+    expected.put("newword4", 4);
+    expected.put("newword5", 4);
+    expected.put("newword7", 4);
+    expected.put("newword9", 6);
+
+    exitCondition = new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        List<Map<Object, Integer>> data = (List<Map<Object, 
Integer>>)TupleCollector.results.get(testId);
+        return (data != null) && data.size() >= 1 && 
expected.equals(data.get(data.size() - 1));
+      }
+    };
+  }
+
+  @Test
+  public void testMethodChainWordcount() throws Exception
+  {
+
+    testId = "testMethodChainWordcount";
+
+    TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+    collector.id = testId;
+    new MyStream<>(StreamFactory.fromFolder("./src/test/resources/data"))
+        .<String, MyStream<String>>flatMap(new 
Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split(" "));
+          }
+        }).myFilterAndMap(new Function.MapFunction<String, String>()
+        {
+          @Override
+          public String f(String input)
+          {
+            return input.replace("word", "newword");
+          }
+        }, new Function.FilterFunction<String>()
+        {
+          @Override
+          public Boolean f(String input)
+          {
+            return input.startsWith("word");
+          }
+        }).countByKey()
+        .addOperator(collector, collector.inputPort, 
collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+
+
+    List<Map<Object, Integer>> data = (List<Map<Object, 
Integer>>)TupleCollector.results.get(testId);
+    Assert.assertTrue(data.size() > 1);
+    Assert.assertEquals(expected, data.get(data.size() - 1));
+  }
+
+  @Test
+  public void testNonMethodChainWordcount() throws Exception
+  {
+    testId = "testNonMethodChainWordcount";
+
+    TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+    collector.id = testId;
+    MyStream<String> mystream = new MyStream<>(StreamFactory
+        .fromFolder("./src/test/resources/data"))
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split(" "));
+          }
+        });
+    mystream.myFilterAndMap(new Function.MapFunction<String, String>()
+    {
+      @Override
+      public String f(String input)
+      {
+        return input.replace("word", "newword");
+      }
+    }, new Function.FilterFunction<String>()
+    {
+      @Override
+      public Boolean f(String input)
+      {
+        return input.startsWith("word");
+      }
+    }).countByKey().addOperator(collector, collector.inputPort, 
collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+
+
+    List<Map<Object, Integer>> data = (List<Map<Object, 
Integer>>)TupleCollector.results.get(testId);
+    Assert.assertTrue(data.size() > 1);
+    Assert.assertEquals(expected, data.get(data.size() - 1));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
----------------------------------------------------------------------
diff --git 
a/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java 
b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
new file mode 100644
index 0000000..a5c644d
--- /dev/null
+++ 
b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * A helper class for assertion of results collected
+ */
+public class TupleCollector<T> extends BaseOperator
+{
+
+  public static volatile Map<String, List<?>> results = new HashMap<>();
+
+  public final transient CollectorInputPort<T> inputPort = new 
CollectorInputPort<>(this);
+
+  public final transient DefaultOutputPort<T> outputPort = new 
DefaultOutputPort<>();
+
+  public String id = "";
+
+  public static class CollectorInputPort<T> extends DefaultInputPort<T>
+  {
+    TupleCollector ownerOperator;
+
+    List<T> list;
+
+    public CollectorInputPort(TupleCollector ownerOperator)
+    {
+      super();
+      this.ownerOperator = ownerOperator;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void process(T tuple)
+    {
+      list.add(tuple);
+      ownerOperator.outputPort.emit(tuple);
+    }
+
+    @Override
+    public void setConnected(boolean flag)
+    {
+      if (flag) {
+        results.put(ownerOperator.id, list = new LinkedList<>());
+      }
+    }
+  }
+
+
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/resources/data/word.txt
----------------------------------------------------------------------
diff --git a/stream/src/test/resources/data/word.txt 
b/stream/src/test/resources/data/word.txt
new file mode 100644
index 0000000..7e28409
--- /dev/null
+++ b/stream/src/test/resources/data/word.txt
@@ -0,0 +1,2 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 
word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 
word5 word4 word2 word1 error

Reply via email to