Repository: flink
Updated Branches:
  refs/heads/master 39115abe4 -> 4cb967089


http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
new file mode 100644
index 0000000..19d05a8
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.wrappers;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IComponent;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.storm.api.TestTopologyBuilder;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.google.common.collect.Sets;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@PowerMockIgnore("javax.*")
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WrapperSetupHelper.class)
+public class WrapperSetupHelperTest extends AbstractTest {
+
+       @Test
+       public void testEmptyDeclarerBolt() {
+               IComponent boltOrSpout;
+
+               if (this.r.nextBoolean()) {
+                       boltOrSpout = mock(IRichSpout.class);
+               } else {
+                       boltOrSpout = mock(IRichBolt.class);
+               }
+
+               Assert.assertEquals(new HashMap<String, Integer>(),
+                               
WrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testRawType() throws Exception {
+               IComponent boltOrSpout;
+
+               if (this.r.nextBoolean()) {
+                       boltOrSpout = mock(IRichSpout.class);
+               } else {
+                       boltOrSpout = mock(IRichBolt.class);
+               }
+
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               declarer.declare(new Fields("dummy1", "dummy2"));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               WrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
+                               Sets.newHashSet(new String[] { 
Utils.DEFAULT_STREAM_ID }));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testToManyAttributes() throws Exception {
+               IComponent boltOrSpout;
+
+               if (this.r.nextBoolean()) {
+                       boltOrSpout = mock(IRichSpout.class);
+               } else {
+                       boltOrSpout = mock(IRichBolt.class);
+               }
+
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               final String[] schema = new String[26];
+               for (int i = 0; i < schema.length; ++i) {
+                       schema[i] = "a" + i;
+               }
+               declarer.declare(new Fields(schema));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               WrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null);
+       }
+
+       @Test
+       public void testTupleTypes() throws Exception {
+               for (int i = -1; i < 26; ++i) {
+                       this.testTupleTypes(i);
+               }
+       }
+
+       private void testTupleTypes(final int numberOfAttributes) throws 
Exception {
+               String[] schema;
+               if (numberOfAttributes == -1) {
+                       schema = new String[1];
+               } else {
+                       schema = new String[numberOfAttributes];
+               }
+               for (int i = 0; i < schema.length; ++i) {
+                       schema[i] = "a" + i;
+               }
+
+               IComponent boltOrSpout;
+               if (this.r.nextBoolean()) {
+                       boltOrSpout = mock(IRichSpout.class);
+               } else {
+                       boltOrSpout = mock(IRichBolt.class);
+               }
+
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               declarer.declare(new Fields(schema));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               HashMap<String, Integer> attributes = new HashMap<String, 
Integer>();
+               attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes);
+
+               Assert.assertEquals(attributes, 
WrapperSetupHelper.getNumberOfAttributes(
+                               boltOrSpout,
+                               numberOfAttributes == -1 ? Sets
+                                               .newHashSet(new String[] { 
Utils.DEFAULT_STREAM_ID }) : null));
+       }
+
+       @Test
+       public void testCreateTopologyContext() {
+               HashMap<String, Integer> dops = new HashMap<String, Integer>();
+               dops.put("spout1", 1);
+               dops.put("spout2", 3);
+               dops.put("bolt1", 1);
+               dops.put("bolt2", 2);
+               dops.put("sink", 1);
+
+               HashMap<String, Integer> taskCounter = new HashMap<String, 
Integer>();
+               taskCounter.put("spout1", 0);
+               taskCounter.put("spout2", 0);
+               taskCounter.put("bolt1", 0);
+               taskCounter.put("bolt2", 0);
+               taskCounter.put("sink", 0);
+
+               HashMap<String, IComponent> operators = new HashMap<String, 
IComponent>();
+               operators.put("spout1", new TestDummySpout());
+               operators.put("spout2", new TestDummySpout());
+               operators.put("bolt1", new TestDummyBolt());
+               operators.put("bolt2", new TestDummyBolt());
+               operators.put("sink", new TestSink());
+
+               TopologyBuilder builder = new TopologyBuilder();
+
+               builder.setSpout("spout1", (IRichSpout) 
operators.get("spout1"), dops.get("spout1"));
+               builder.setSpout("spout2", (IRichSpout) 
operators.get("spout2"), dops.get("spout2"));
+               builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), 
dops.get("bolt1")).shuffleGrouping("spout1");
+               builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), 
dops.get("bolt2")).allGrouping("spout2");
+               builder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
+               .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new 
Fields("id"))
+               .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+               .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new 
Fields("id"))
+               .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+               final int maxRetry = 3;
+               int counter;
+               for (counter = 0; counter < maxRetry; ++counter) {
+                       LocalCluster cluster = new LocalCluster();
+                       Config c = new Config();
+                       c.setNumAckers(0);
+                       cluster.submitTopology("test", c, 
builder.createTopology());
+                       Utils.sleep((counter + 1) * 5000);
+                       cluster.shutdown();
+
+                       if (TestSink.result.size() == 8) {
+                               break;
+                       }
+               }
+               Assert.assertTrue(counter < maxRetry);
+
+               TestTopologyBuilder flinkBuilder = new TestTopologyBuilder();
+
+               flinkBuilder.setSpout("spout1", (IRichSpout) 
operators.get("spout1"), dops.get("spout1"));
+               flinkBuilder.setSpout("spout2", (IRichSpout) 
operators.get("spout2"), dops.get("spout2"));
+               flinkBuilder.setBolt("bolt1", (IRichBolt) 
operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
+               flinkBuilder.setBolt("bolt2", (IRichBolt) 
operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
+               flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
+               .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new 
Fields("id"))
+               .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+               .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new 
Fields("id"))
+               .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+               flinkBuilder.createTopology();
+               StormTopology stormTopology = flinkBuilder.getStormTopology();
+
+               Set<Integer> taskIds = new HashSet<Integer>();
+
+               for (TopologyContext expectedContext : TestSink.result) {
+                       final String thisComponentId = 
expectedContext.getThisComponentId();
+                       int index = taskCounter.get(thisComponentId);
+
+                       StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
+                       when(context.getTaskName()).thenReturn(thisComponentId);
+                       
when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
+                       when(context.getIndexOfThisSubtask()).thenReturn(index);
+                       taskCounter.put(thisComponentId, ++index);
+
+                       Config stormConfig = new Config();
+                       stormConfig.put(WrapperSetupHelper.TOPOLOGY_NAME, 
"test");
+
+                       TopologyContext topologyContext = 
WrapperSetupHelper.createTopologyContext(
+                                       context, 
operators.get(thisComponentId), stormTopology, stormConfig);
+
+                       ComponentCommon expcetedCommon = 
expectedContext.getComponentCommon(thisComponentId);
+                       ComponentCommon common = 
topologyContext.getComponentCommon(thisComponentId);
+
+                       Assert.assertNull(topologyContext.getCodeDir());
+                       Assert.assertNull(common.get_json_conf());
+                       
Assert.assertNull(topologyContext.getExecutorData(null));
+                       Assert.assertNull(topologyContext.getPIDDir());
+                       Assert.assertNull(topologyContext.getResource(null));
+                       Assert.assertNull(topologyContext.getSharedExecutor());
+                       Assert.assertNull(expectedContext.getTaskData(null));
+                       Assert.assertNull(topologyContext.getThisWorkerPort());
+
+                       
Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
+
+                       Assert.assertEquals(expcetedCommon.get_inputs(), 
common.get_inputs());
+                       
Assert.assertEquals(expcetedCommon.get_parallelism_hint(), 
common.get_parallelism_hint());
+                       Assert.assertEquals(expcetedCommon.get_streams(), 
common.get_streams());
+                       Assert.assertEquals(expectedContext.getComponentIds(), 
topologyContext.getComponentIds());
+                       
Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
+                                       
topologyContext.getComponentStreams(thisComponentId));
+                       Assert.assertEquals(thisComponentId, 
topologyContext.getThisComponentId());
+                       Assert.assertEquals(expectedContext.getThisSources(), 
topologyContext.getThisSources());
+                       Assert.assertEquals(expectedContext.getThisStreams(), 
topologyContext.getThisStreams());
+                       Assert.assertEquals(expectedContext.getThisTargets(), 
topologyContext.getThisTargets());
+                       Assert.assertEquals(0, 
topologyContext.getThisWorkerTasks().size());
+
+                       for (int taskId : 
topologyContext.getComponentTasks(thisComponentId)) {
+                               Assert.assertEquals(thisComponentId, 
topologyContext.getComponentId(taskId));
+                       }
+
+                       for (String componentId : 
expectedContext.getComponentIds()) {
+                               
Assert.assertEquals(expectedContext.getSources(componentId),
+                                               
topologyContext.getSources(componentId));
+                               
Assert.assertEquals(expectedContext.getTargets(componentId),
+                                               
topologyContext.getTargets(componentId));
+
+                               for (String streamId : 
expectedContext.getComponentStreams(componentId)) {
+                                       Assert.assertEquals(
+                                                       
expectedContext.getComponentOutputFields(componentId, streamId).toList(),
+                                                       
topologyContext.getComponentOutputFields(componentId, streamId).toList());
+                               }
+                       }
+
+                       for (String streamId : 
expectedContext.getThisStreams()) {
+                               
Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
+                                               
topologyContext.getThisOutputFields(streamId).toList());
+                       }
+
+                       HashMap<Integer, String> taskToComponents = new 
HashMap<Integer, String>();
+                       Set<Integer> allTaskIds = new HashSet<Integer>();
+                       for (String componentId : 
expectedContext.getComponentIds()) {
+                               List<Integer> possibleTasks = 
expectedContext.getComponentTasks(componentId);
+                               List<Integer> tasks = 
topologyContext.getComponentTasks(componentId);
+
+                               Iterator<Integer> p_it = 
possibleTasks.iterator();
+                               Iterator<Integer> t_it = tasks.iterator();
+                               while(p_it.hasNext()) {
+                                       Assert.assertTrue(t_it.hasNext());
+                                       
Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
+                                       
Assert.assertTrue(allTaskIds.add(t_it.next()));
+                               }
+                               Assert.assertFalse(t_it.hasNext());
+                       }
+
+                       Assert.assertEquals(taskToComponents, 
expectedContext.getTaskToComponent());
+                       
Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
+
+                       try {
+                               topologyContext.getHooks();
+                               Assert.fail();
+                       } catch (UnsupportedOperationException e) { /* expected 
*/ }
+
+                       try {
+                               topologyContext.getRegisteredMetricByName(null);
+                               Assert.fail();
+                       } catch (UnsupportedOperationException e) { /* expected 
*/ }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/resources/log4j-test.properties 
b/flink-contrib/flink-storm/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/resources/log4j.properties 
b/flink-contrib/flink-storm/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ed2bbcb
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/resources/logback-test.xml 
b/flink-contrib/flink-storm/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..4f56748
--- /dev/null
+++ b/flink-contrib/flink-storm/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml
index 9038363..074edfa 100644
--- a/flink-contrib/pom.xml
+++ b/flink-contrib/pom.xml
@@ -32,7 +32,8 @@ under the License.
        </parent>
 
        <modules>
-               <module>flink-storm-compatibility</module>
+               <module>flink-storm</module>
+               <module>flink-storm-examples</module>
                <module>flink-streaming-contrib</module>
                <module>flink-tweet-inputformat</module>
                <module>flink-operator-stats</module>

Reply via email to