http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java new file mode 100644 index 0000000..f51aba4 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.util; + +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +public abstract class AbstractTest { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class); + + protected long seed; + protected Random r; + + @Before + public void prepare() { + this.seed = System.currentTimeMillis(); + this.r = new Random(this.seed); + LOG.info("Test seed: {}", new Long(this.seed)); + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java new file mode 100644 index 0000000..1b320e5 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.util; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +import java.util.Map; + +public class FiniteTestSpout implements IRichSpout { + private static final long serialVersionUID = 7992419478267824279L; + + private int numberOfOutputTuples; + private SpoutOutputCollector collector; + + public FiniteTestSpout(final int numberOfOutputTuples) { + this.numberOfOutputTuples = numberOfOutputTuples; + } + + @SuppressWarnings("rawtypes") + @Override + public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { + this.collector = collector; + } + + @Override + public void close() {/* nothing to do */} + + @Override + public void activate() {/* nothing to do */} + + @Override + public void deactivate() {/* nothing to do */} + + @Override + public void nextTuple() { + if (--this.numberOfOutputTuples >= 0) { + this.collector.emit(new Values(new Integer(this.numberOfOutputTuples))); + } + } + + @Override + public void ack(final Object msgId) {/* nothing to do */} + + @Override + public void fail(final Object msgId) {/* nothing to do */} + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("dummy")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java new file mode 100644 index 0000000..17de427 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.util; + +import java.util.Iterator; + +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormStreamSelector; +import org.junit.Assert; +import org.junit.Test; + +public class StormStreamSelectorTest { + + @Test + public void testSelector() { + StormStreamSelector<Object> selector = new StormStreamSelector<Object>(); + SplitStreamType<Object> tuple = new SplitStreamType<Object>(); + Iterator<String> result; + + tuple.streamId = "stream1"; + result = selector.select(tuple).iterator(); + Assert.assertEquals("stream1", result.next()); + Assert.assertFalse(result.hasNext()); + + tuple.streamId = "stream2"; + result = selector.select(tuple).iterator(); + Assert.assertEquals("stream2", result.next()); + Assert.assertFalse(result.hasNext()); + + tuple.streamId = "stream1"; + result = selector.select(tuple).iterator(); + Assert.assertEquals("stream1", result.next()); + Assert.assertFalse(result.hasNext()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java new file mode 100644 index 0000000..b7458df --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.util; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +public class TestDummyBolt implements IRichBolt { + private static final long serialVersionUID = 6893611247443121322L; + + public final static String shuffleStreamId = "shuffleStream"; + public final static String groupingStreamId = "groupingStream"; + + private boolean emit = true; + @SuppressWarnings("rawtypes") + public Map config; + private TopologyContext context; + private OutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.config = stormConf; + this.context = context; + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + if (this.context.getThisTaskIndex() == 0) { + this.collector.emit(shuffleStreamId, input.getValues()); + } + if (this.emit) { + this.collector.emit(groupingStreamId, new Values("bolt", this.context)); + this.emit = false; + } + } + + @Override + public void cleanup() {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(shuffleStreamId, new Fields("data")); + declarer.declareStream(groupingStreamId, new Fields("id", "data")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java new file mode 100644 index 0000000..ed9ffff --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.util; + +import java.util.Map; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; + +public class TestDummySpout implements IRichSpout { + private static final long serialVersionUID = -5190945609124603118L; + + public final static String spoutStreamId = "spout-stream"; + + private boolean emit = true; + @SuppressWarnings("rawtypes") + public Map config; + private TopologyContext context; + private SpoutOutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.config = conf; + this.context = context; + this.collector = collector; + } + + @Override + public void close() {} + + @Override + public void activate() {} + + @Override + public void deactivate() {} + + @Override + public void nextTuple() { + if (this.emit) { + this.collector.emit(new Values(this.context)); + this.emit = false; + } + } + + @Override + public void ack(Object msgId) {} + + @Override + public void fail(Object msgId) {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("data")); + declarer.declareStream(spoutStreamId, new Fields("id", "data")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java new file mode 100644 index 0000000..59939fd --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.util; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; + +public class TestSink implements IRichBolt { + private static final long serialVersionUID = 4314871456719370877L; + + public final static List<TopologyContext> result = new LinkedList<TopologyContext>(); + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + result.add(context); + } + + @Override + public void execute(Tuple input) { + if (input.size() == 1) { + result.add((TopologyContext) input.getValue(0)); + } else { + result.add((TopologyContext) input.getValue(1)); + } + } + + @Override + public void cleanup() {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) {} + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java new file mode 100644 index 0000000..3d7d26b --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import backtype.storm.tuple.Values; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.storm.util.AbstractTest; +import org.apache.flink.storm.wrappers.BoltCollector; +import org.apache.flink.streaming.api.operators.Output; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class BoltCollectorTest extends AbstractTest { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testBoltStormCollector() throws InstantiationException, IllegalAccessException { + for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) { + final Output flinkCollector = mock(Output.class); + Tuple flinkTuple = null; + final Values tuple = new Values(); + + BoltCollector<?> collector; + + final String streamId = "streamId"; + HashMap<String, Integer> attributes = new HashMap<String, Integer>(); + attributes.put(streamId, numberOfAttributes); + + if (numberOfAttributes == -1) { + collector = new BoltCollector(attributes, flinkCollector); + tuple.add(new Integer(this.r.nextInt())); + + } else { + collector = new BoltCollector(attributes, flinkCollector); + flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance(); + + for (int i = 0; i < numberOfAttributes; ++i) { + tuple.add(new Integer(this.r.nextInt())); + flinkTuple.setField(tuple.get(i), i); + } + } + + final Collection anchors = mock(Collection.class); + final List<Integer> taskIds; + taskIds = collector.emit(streamId, anchors, tuple); + + Assert.assertNull(taskIds); + + if (numberOfAttributes == -1) { + verify(flinkCollector).collect(tuple.get(0)); + } else { + verify(flinkCollector).collect(flinkTuple); + } + } + } + + @SuppressWarnings("unchecked") + @Test(expected = UnsupportedOperationException.class) + public void testEmitDirect() { + new BoltCollector<Object>(mock(HashMap.class), mock(Output.class)).emitDirect(0, null, + null, null); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java new file mode 100644 index 0000000..e33fdb9 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.storm.util.AbstractTest; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormConfig; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.wrappers.BoltWrapper; +import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer; +import org.apache.flink.storm.wrappers.StormTuple; +import org.apache.flink.storm.wrappers.WrapperSetupHelper; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.same; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class}) +public class BoltWrapperTest extends AbstractTest { + + @Test(expected = IllegalArgumentException.class) + public void testWrapperRawType() throws Exception { + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + declarer.declare(new Fields("dummy1", "dummy2")); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + + new BoltWrapper<Object, Object>(mock(IRichBolt.class), + new String[] { Utils.DEFAULT_STREAM_ID }); + } + + @Test(expected = IllegalArgumentException.class) + public void testWrapperToManyAttributes1() throws Exception { + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + final String[] schema = new String[26]; + for (int i = 0; i < schema.length; ++i) { + schema[i] = "a" + i; + } + declarer.declare(new Fields(schema)); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + + new BoltWrapper<Object, Object>(mock(IRichBolt.class)); + } + + @Test(expected = IllegalArgumentException.class) + public void testWrapperToManyAttributes2() throws Exception { + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + final String[] schema = new String[26]; + for (int i = 0; i < schema.length; ++i) { + schema[i] = "a" + i; + } + declarer.declare(new Fields(schema)); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + + new BoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {}); + } + + @Test + public void testWrapper() throws Exception { + for (int i = -1; i < 26; ++i) { + this.testWrapper(i); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void testWrapper(final int numberOfAttributes) throws Exception { + assert ((-1 <= numberOfAttributes) && (numberOfAttributes <= 25)); + Tuple flinkTuple = null; + String rawTuple = null; + + if (numberOfAttributes == -1) { + rawTuple = "test"; + } else { + flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance(); + } + + final String[] schema; + if (numberOfAttributes == -1) { + schema = new String[1]; + } else { + schema = new String[numberOfAttributes]; + } + for (int i = 0; i < schema.length; ++i) { + schema[i] = "a" + i; + } + + final StreamRecord record = mock(StreamRecord.class); + if (numberOfAttributes == -1) { + when(record.getValue()).thenReturn(rawTuple); + } else { + when(record.getValue()).thenReturn(flinkTuple); + } + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final IRichBolt bolt = mock(IRichBolt.class); + + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + declarer.declare(new Fields(schema)); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + + final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null); + wrapper.setup(mock(Output.class), taskContext); + wrapper.open(null); + + wrapper.processElement(record); + if (numberOfAttributes == -1) { + verify(bolt).execute(eq(new StormTuple<String>(rawTuple, null))); + } else { + verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple, null))); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testMultipleOutputStreams() throws Exception { + final boolean rawOutType1 = super.r.nextBoolean(); + final boolean rawOutType2 = super.r.nextBoolean(); + + final StreamRecord record = mock(StreamRecord.class); + when(record.getValue()).thenReturn(2).thenReturn(3); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final Output output = mock(Output.class); + + final TestBolt bolt = new TestBolt(); + final HashSet<String> raw = new HashSet<String>(); + if (rawOutType1) { + raw.add("stream1"); + } + if (rawOutType2) { + raw.add("stream2"); + } + + final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null, raw); + wrapper.setup(output, taskContext); + wrapper.open(null); + + final SplitStreamType splitRecord = new SplitStreamType<Integer>(); + if (rawOutType1) { + splitRecord.streamId = "stream1"; + splitRecord.value = 2; + } else { + splitRecord.streamId = "stream1"; + splitRecord.value = new Tuple1<Integer>(2); + } + wrapper.processElement(record); + verify(output).collect(new StreamRecord<SplitStreamType>(splitRecord, 0)); + + if (rawOutType2) { + splitRecord.streamId = "stream2"; + splitRecord.value = 3; + } else { + splitRecord.streamId = "stream2"; + splitRecord.value = new Tuple1<Integer>(3); + } + wrapper.processElement(record); + verify(output, times(2)).collect(new StreamRecord<SplitStreamType>(splitRecord, 0)); + } + + @SuppressWarnings("unchecked") + @Test + public void testOpen() throws Exception { + final StormConfig stormConfig = new StormConfig(); + final Configuration flinkConfig = new Configuration(); + + final ExecutionConfig taskConfig = mock(ExecutionConfig.class); + when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig) + .thenReturn(flinkConfig); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(taskConfig); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + declarer.declare(new Fields("dummy")); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + + final IRichBolt bolt = mock(IRichBolt.class); + + BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); + wrapper.setup(mock(Output.class), taskContext); + + // test without configuration + wrapper.open(null); + verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); + + // test with StormConfig + wrapper.open(null); + verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), + any(OutputCollector.class)); + + // test with Configuration + final TestDummyBolt testBolt = new TestDummyBolt(); + wrapper = new BoltWrapper<Object, Object>(testBolt); + wrapper.setup(mock(Output.class), taskContext); + + wrapper.open(null); + for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) { + Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey())); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testOpenSink() throws Exception { + final StormConfig stormConfig = new StormConfig(); + final Configuration flinkConfig = new Configuration(); + + final ExecutionConfig taskConfig = mock(ExecutionConfig.class); + when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig) + .thenReturn(flinkConfig); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(taskConfig); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final IRichBolt bolt = mock(IRichBolt.class); + + BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); + wrapper.setup(mock(Output.class), taskContext); + + // test without configuration + wrapper.open(null); + verify(bolt).prepare(any(Map.class), any(TopologyContext.class), + isNull(OutputCollector.class)); + + // test with StormConfig + wrapper.open(null); + verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), + isNull(OutputCollector.class)); + + // test with Configuration + final TestDummyBolt testBolt = new TestDummyBolt(); + wrapper = new BoltWrapper<Object, Object>(testBolt); + wrapper.setup(mock(Output.class), taskContext); + + wrapper.open(null); + for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) { + Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey())); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testClose() throws Exception { + final IRichBolt bolt = mock(IRichBolt.class); + + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + declarer.declare(new Fields("dummy")); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + + final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + wrapper.setup(mock(Output.class), taskContext); + + wrapper.close(); + wrapper.dispose(); + + verify(bolt).cleanup(); + } + + private static final class TestBolt implements IRichBolt { + private static final long serialVersionUID = 7278692872260138758L; + private OutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + int counter = 0; + @Override + public void execute(backtype.storm.tuple.Tuple input) { + if (++counter % 2 == 1) { + this.collector.emit("stream1", new Values(input.getInteger(0))); + } else { + this.collector.emit("stream2", new Values(input.getInteger(0))); + } + } + + @Override + public void cleanup() {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream("stream1", new Fields("a1")); + declarer.declareStream("stream2", new Fields("a2")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java new file mode 100644 index 0000000..69d4a8e --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import java.util.HashMap; + +import backtype.storm.generated.Bolt; +import backtype.storm.generated.SpoutSpec; +import backtype.storm.generated.StateSpoutSpec; +import backtype.storm.generated.StormTopology; +import backtype.storm.metric.api.ICombiner; +import backtype.storm.metric.api.IMetric; +import backtype.storm.metric.api.IReducer; + +import org.apache.flink.storm.util.AbstractTest; +import org.apache.flink.storm.wrappers.FlinkTopologyContext; +import org.junit.Test; + + +/* + * FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here, + * because those are tested in StormWrapperSetupHelperTest. + */ +public class FlinkTopologyContextTest extends AbstractTest { + + @Test(expected = UnsupportedOperationException.class) + public void testAddTaskHook() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .addTaskHook(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetHooks() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .getHooks(); + } + + @SuppressWarnings("rawtypes") + @Test(expected = UnsupportedOperationException.class) + public void testRegisteredMetric1() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .registerMetric(null, (ICombiner) null, 0); + } + + @SuppressWarnings("rawtypes") + @Test(expected = UnsupportedOperationException.class) + public void testRegisteredMetric2() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .registerMetric(null, (IReducer) null, 0); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRegisteredMetric3() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .registerMetric(null, (IMetric) null, 0); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetRegisteredMetricByName() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .getRegisteredMetricByName(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testSetAllSubscribedState() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .setAllSubscribedState(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testSetSubscribedState1() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .setSubscribedState(null, null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testSetSubscribedState2() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .setSubscribedState(null, null, null); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java new file mode 100644 index 0000000..4618101 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +import org.apache.flink.storm.util.AbstractTest; +import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; + +public class SetupOutputFieldsDeclarerTest extends AbstractTest { + + @Test + public void testDeclare() { + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + + int numberOfAttributes = this.r.nextInt(26); + declarer.declare(createSchema(numberOfAttributes)); + Assert.assertEquals(1, declarer.outputSchemas.size()); + Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID) + .intValue()); + + final String sid = "streamId"; + numberOfAttributes = this.r.nextInt(26); + declarer.declareStream(sid, createSchema(numberOfAttributes)); + Assert.assertEquals(2, declarer.outputSchemas.size()); + Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue()); + } + + private Fields createSchema(final int numberOfAttributes) { + final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes); + for (int i = 0; i < numberOfAttributes; ++i) { + schema.add("a" + i); + } + return new Fields(schema); + } + + @Test + public void testDeclareDirect() { + new SetupOutputFieldsDeclarer().declare(false, new Fields()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeclareDirectFail() { + new SetupOutputFieldsDeclarer().declare(true, new Fields()); + } + + @Test + public void testDeclareStream() { + new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields()); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareStreamFail() { + new SetupOutputFieldsDeclarer().declareStream(null, new Fields()); + } + + @Test + public void testDeclareFullStream() { + new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields()); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareFullStreamFailNonDefaultStream() { + new SetupOutputFieldsDeclarer().declareStream(null, false, new Fields()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeclareFullStreamFailDirect() { + new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java new file mode 100644 index 0000000..6b60d2b --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import backtype.storm.tuple.Values; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.storm.util.AbstractTest; +import org.apache.flink.storm.wrappers.SpoutCollector; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class SpoutCollectorTest extends AbstractTest { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException { + for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) { + final SourceContext flinkCollector = mock(SourceContext.class); + Tuple flinkTuple = null; + final Values tuple = new Values(); + + SpoutCollector<?> collector; + + final String streamId = "streamId"; + HashMap<String, Integer> attributes = new HashMap<String, Integer>(); + attributes.put(streamId, numberOfAttributes); + + if (numberOfAttributes == -1) { + collector = new SpoutCollector(attributes, flinkCollector); + tuple.add(new Integer(this.r.nextInt())); + + } else { + collector = new SpoutCollector(attributes, flinkCollector); + flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance(); + + for (int i = 0; i < numberOfAttributes; ++i) { + tuple.add(new Integer(this.r.nextInt())); + flinkTuple.setField(tuple.get(i), i); + } + } + + final List<Integer> taskIds; + final Object messageId = new Integer(this.r.nextInt()); + + taskIds = collector.emit(streamId, tuple, messageId); + + Assert.assertNull(taskIds); + + if (numberOfAttributes == -1) { + verify(flinkCollector).collect(tuple.get(0)); + } else { + verify(flinkCollector).collect(flinkTuple); + } + } + } + + @SuppressWarnings("unchecked") + @Test(expected = UnsupportedOperationException.class) + public void testEmitDirect() { + new SpoutCollector<Object>(mock(HashMap.class), mock(SourceContext.class)).emitDirect( + 0, null, null, + (Object) null); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java new file mode 100644 index 0000000..227d736 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.tuple.Fields; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.storm.util.AbstractTest; +import org.apache.flink.storm.util.FiniteSpout; +import org.apache.flink.storm.util.FiniteTestSpout; +import org.apache.flink.storm.util.StormConfig; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer; +import org.apache.flink.storm.wrappers.SpoutWrapper; +import org.apache.flink.storm.wrappers.WrapperSetupHelper; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.LinkedList; +import java.util.Map; +import java.util.Map.Entry; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(WrapperSetupHelper.class) +public class SpoutWrapperTest extends AbstractTest { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testRunPrepare() throws Exception { + final StormConfig stormConfig = new StormConfig(); + stormConfig.put(this.r.nextInt(), this.r.nextInt()); + final Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger("testKey", this.r.nextInt()); + + final ExecutionConfig taskConfig = mock(ExecutionConfig.class); + when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig) + .thenReturn(flinkConfig); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(taskConfig); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final IRichSpout spout = mock(IRichSpout.class); + SpoutWrapper spoutWrapper = new SpoutWrapper(spout); + spoutWrapper.setRuntimeContext(taskContext); + spoutWrapper.cancel(); + + // test without configuration + spoutWrapper.run(mock(SourceContext.class)); + verify(spout).open(any(Map.class), any(TopologyContext.class), + any(SpoutOutputCollector.class)); + + // test with StormConfig + spoutWrapper.run(mock(SourceContext.class)); + verify(spout).open(eq(stormConfig), any(TopologyContext.class), + any(SpoutOutputCollector.class)); + + // test with Configuration + final TestDummySpout testSpout = new TestDummySpout(); + spoutWrapper = new SpoutWrapper(testSpout); + spoutWrapper.setRuntimeContext(taskContext); + spoutWrapper.cancel(); + + spoutWrapper.run(mock(SourceContext.class)); + for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) { + Assert.assertEquals(entry.getValue(), testSpout.config.get(entry.getKey())); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testRunExecuteFixedNumber() throws Exception { + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + declarer.declare(new Fields("dummy")); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments() + .thenReturn(declarer); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final IRichSpout spout = mock(IRichSpout.class); + final int numberOfCalls = this.r.nextInt(50); + final SpoutWrapper<?> spoutWrapper = new SpoutWrapper<Object>(spout, + numberOfCalls); + spoutWrapper.setRuntimeContext(taskContext); + + spoutWrapper.run(mock(SourceContext.class)); + verify(spout, times(numberOfCalls)).nextTuple(); + } + + @Test + public void testRunExecuteFinite() throws Exception { + final int numberOfCalls = this.r.nextInt(50); + + final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>(); + for (int i = numberOfCalls - 1; i >= 0; --i) { + expectedResult.add(new Tuple1<Integer>(new Integer(i))); + } + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final FiniteTestSpout spout = new FiniteTestSpout(numberOfCalls); + final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>( + spout, -1); + spoutWrapper.setRuntimeContext(taskContext); + + final TestContext collector = new TestContext(); + spoutWrapper.run(collector); + + Assert.assertEquals(expectedResult, collector.result); + } + + @SuppressWarnings("unchecked") + @Test + public void runAndExecuteFiniteSpout() throws Exception { + final FiniteSpout stormSpout = mock(FiniteSpout.class); + when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout); + wrapper.setRuntimeContext(taskContext); + + wrapper.run(mock(SourceContext.class)); + verify(stormSpout, times(3)).nextTuple(); + } + + @SuppressWarnings("unchecked") + @Test + public void runAndExecuteFiniteSpout2() throws Exception { + final FiniteSpout stormSpout = mock(FiniteSpout.class); + when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout); + wrapper.setRuntimeContext(taskContext); + + wrapper.run(mock(SourceContext.class)); + verify(stormSpout, never()).nextTuple(); + } + + @Test + public void testCancel() throws Exception { + final int numberOfCalls = 5 + this.r.nextInt(5); + + final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); + when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); + + final IRichSpout spout = new FiniteTestSpout(numberOfCalls); + + final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>(spout); + spoutWrapper.setRuntimeContext(taskContext); + + spoutWrapper.cancel(); + final TestContext collector = new TestContext(); + spoutWrapper.run(collector); + + Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result); + } + + @Test + public void testClose() throws Exception { + final IRichSpout spout = mock(IRichSpout.class); + final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new SpoutWrapper<Tuple1<Integer>>(spout); + + spoutWrapper.close(); + + verify(spout).close(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java new file mode 100644 index 0000000..155fcd9 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java @@ -0,0 +1,660 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.storm.util.AbstractTest; +import org.apache.flink.storm.wrappers.StormTuple; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class StormTupleTest extends AbstractTest { + private static final String fieldName = "fieldName"; + private static final String fieldNamePojo = "member"; + + private int arity, index; + + @Override + @Before + public void prepare() { + super.prepare(); + this.arity = 1 + r.nextInt(25); + this.index = r.nextInt(this.arity); + } + + @Test + public void nonTupleTest() { + final Object flinkTuple = this.r.nextInt(); + + final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple, null); + Assert.assertSame(flinkTuple, tuple.getValue(0)); + + final List<Object> values = tuple.getValues(); + Assert.assertEquals(1, values.size()); + Assert.assertEquals(flinkTuple, values.get(0)); + } + + @Test + public void tupleTest() throws InstantiationException, IllegalAccessException { + final int numberOfAttributes = this.r.nextInt(26); + final Object[] data = new Object[numberOfAttributes]; + + final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance(); + for (int i = 0; i < numberOfAttributes; ++i) { + data[i] = this.r.nextInt(); + flinkTuple.setField(data[i], i); + } + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + final List<Object> values = tuple.getValues(); + + Assert.assertEquals(numberOfAttributes, values.size()); + for (int i = 0; i < numberOfAttributes; ++i) { + Assert.assertEquals(flinkTuple.getField(i), values.get(i)); + } + + Assert.assertEquals(numberOfAttributes, tuple.size()); + } + + @Test + public void testBinary() { + final byte[] data = new byte[this.r.nextInt(15)]; + this.r.nextBytes(data); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index)); + } + + @Test + public void testBoolean() { + final Boolean flinkTuple = this.r.nextBoolean(); + + final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple, null); + Assert.assertEquals(flinkTuple, tuple.getBoolean(0)); + } + + @Test + public void testByte() { + final Byte flinkTuple = (byte) this.r.nextInt(); + + final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple, null); + Assert.assertEquals(flinkTuple, tuple.getByte(0)); + } + + @Test + public void testDouble() { + final Double flinkTuple = this.r.nextDouble(); + + final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple, null); + Assert.assertEquals(flinkTuple, tuple.getDouble(0)); + } + + @Test + public void testFloat() { + final Float flinkTuple = this.r.nextFloat(); + + final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple, null); + Assert.assertEquals(flinkTuple, tuple.getFloat(0)); + } + + @Test + public void testInteger() { + final Integer flinkTuple = this.r.nextInt(); + + final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple, null); + Assert.assertEquals(flinkTuple, tuple.getInteger(0)); + } + + @Test + public void testLong() { + final Long flinkTuple = this.r.nextLong(); + + final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple, null); + Assert.assertEquals(flinkTuple, tuple.getLong(0)); + } + + @Test + public void testShort() { + final Short flinkTuple = (short) this.r.nextInt(); + + final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple, null); + Assert.assertEquals(flinkTuple, tuple.getShort(0)); + } + + @Test + public void testString() { + final byte[] data = new byte[this.r.nextInt(15)]; + this.r.nextBytes(data); + final String flinkTuple = new String(data); + + final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null); + Assert.assertEquals(flinkTuple, tuple.getString(0)); + } + + @Test + public void testBinaryTuple() { + final byte[] data = new byte[this.r.nextInt(15)]; + this.r.nextBytes(data); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index)); + } + + @Test + public void testBooleanTuple() { + final Boolean data = this.r.nextBoolean(); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index)); + } + + @Test + public void testByteTuple() { + final Byte data = (byte) this.r.nextInt(); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index)); + } + + @Test + public void testDoubleTuple() { + final Double data = this.r.nextDouble(); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index)); + } + + @Test + public void testFloatTuple() { + final Float data = this.r.nextFloat(); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index)); + } + + @Test + public void testIntegerTuple() { + final Integer data = this.r.nextInt(); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index)); + } + + @Test + public void testLongTuple() { + final Long data = this.r.nextLong(); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index)); + } + + @Test + public void testShortTuple() { + final Short data = (short) this.r.nextInt(); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index)); + } + + @Test + public void testStringTuple() { + final byte[] rawdata = new byte[this.r.nextInt(15)]; + this.r.nextBytes(rawdata); + final String data = new String(rawdata); + + final int index = this.r.nextInt(5); + final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>(); + flinkTuple.setField(data, index); + + final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple, null); + Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index)); + } + + @Test + public void testContains() throws Exception { + Fields schema = new Fields("a1", "a2"); + StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(), + schema); + + Assert.assertTrue(tuple.contains("a1")); + Assert.assertTrue(tuple.contains("a2")); + Assert.assertFalse(tuple.contains("a3")); + } + + @Test + public void testGetFields() throws Exception { + Fields schema = new Fields(); + + Assert.assertSame(schema, new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(), + schema).getFields()); + + Assert.assertSame(null, new StormTuple<Object>(null, schema).getFields()); + + } + + @Test + public void testFieldIndex() throws Exception { + Fields schema = new Fields("a1", "a2"); + StormTuple<Object> tuple = new StormTuple<Object>(Tuple.getTupleClass(1).newInstance(), + schema); + + Assert.assertEquals(0, tuple.fieldIndex("a1")); + Assert.assertEquals(1, tuple.fieldIndex("a2")); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSelect() throws Exception { + Tuple tuple = Tuple.getTupleClass(arity).newInstance(); + Values values = new Values(); + + ArrayList<String> attributeNames = new ArrayList<String>(arity); + ArrayList<String> filterNames = new ArrayList<String>(arity); + + for (int i = 0; i < arity; ++i) { + tuple.setField(i, i); + attributeNames.add("a" + i); + + if (r.nextBoolean()) { + filterNames.add("a" + i); + values.add(i); + } + } + Fields schema = new Fields(attributeNames); + Fields selector = new Fields(filterNames); + + Assert.assertEquals(values, new StormTuple(tuple, schema).select(selector)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetValueByField() throws Exception { + Object value = mock(Object.class); + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertSame(value, tuple.getValueByField(fieldName)); + + } + + @Test + public void testGetValueByFieldPojo() throws Exception { + Object value = mock(Object.class); + TestPojoMember<Object> pojo = new TestPojoMember<Object>(value); + StormTuple<TestPojoMember<Object>> tuple = new StormTuple<TestPojoMember<Object>>(pojo, + null); + Assert.assertSame(value, tuple.getValueByField(fieldNamePojo)); + } + + @Test + public void testGetValueByFieldPojoGetter() throws Exception { + Object value = mock(Object.class); + TestPojoGetter<Object> pojo = new TestPojoGetter<Object>(value); + StormTuple<TestPojoGetter<Object>> tuple = new StormTuple<TestPojoGetter<Object>>(pojo, + null); + Assert.assertSame(value, tuple.getValueByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetStringByField() throws Exception { + String value = "stringValue"; + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertSame(value, tuple.getStringByField(fieldName)); + } + + @Test + public void testGetStringByFieldPojo() throws Exception { + String value = "stringValue"; + TestPojoMember<String> pojo = new TestPojoMember<String>(value); + StormTuple<TestPojoMember<String>> tuple = new StormTuple<TestPojoMember<String>>(pojo, + null); + Assert.assertSame(value, tuple.getStringByField(fieldNamePojo)); + } + + @Test + public void testGetStringByFieldPojoGetter() throws Exception { + String value = "stringValue"; + TestPojoGetter<String> pojo = new TestPojoGetter<String>(value); + StormTuple<TestPojoGetter<String>> tuple = new StormTuple<TestPojoGetter<String>>(pojo, + null); + Assert.assertSame(value, tuple.getStringByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetIntegerByField() throws Exception { + Integer value = r.nextInt(); + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertSame(value, tuple.getIntegerByField(fieldName)); + } + + @Test + public void testGetIntegerByFieldPojo() throws Exception { + Integer value = r.nextInt(); + TestPojoMember<Integer> pojo = new TestPojoMember<Integer>(value); + StormTuple<TestPojoMember<Integer>> tuple = new StormTuple<TestPojoMember<Integer>>(pojo, + null); + Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo)); + } + + @Test + public void testGetIntegerByFieldPojoGetter() throws Exception { + Integer value = r.nextInt(); + TestPojoGetter<Integer> pojo = new TestPojoGetter<Integer>(value); + StormTuple<TestPojoGetter<Integer>> tuple = new StormTuple<TestPojoGetter<Integer>>(pojo, + null); + Assert.assertSame(value, tuple.getIntegerByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetLongByField() throws Exception { + Long value = r.nextLong(); + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertSame(value, tuple.getLongByField(fieldName)); + } + + @Test + public void testGetLongByFieldPojo() throws Exception { + Long value = r.nextLong(); + TestPojoMember<Long> pojo = new TestPojoMember<Long>(value); + StormTuple<TestPojoMember<Long>> tuple = new StormTuple<TestPojoMember<Long>>(pojo, + null); + Assert.assertSame(value, tuple.getLongByField(fieldNamePojo)); + } + + @Test + public void testGetLongByFieldPojoGetter() throws Exception { + Long value = r.nextLong(); + TestPojoGetter<Long> pojo = new TestPojoGetter<Long>(value); + StormTuple<TestPojoGetter<Long>> tuple = new StormTuple<TestPojoGetter<Long>>(pojo, + null); + Assert.assertSame(value, tuple.getLongByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetBooleanByField() throws Exception { + Boolean value = r.nextBoolean(); + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertEquals(value, tuple.getBooleanByField(fieldName)); + } + + @Test + public void testGetBooleanByFieldPojo() throws Exception { + Boolean value = r.nextBoolean(); + TestPojoMember<Boolean> pojo = new TestPojoMember<Boolean>(value); + StormTuple<TestPojoMember<Boolean>> tuple = new StormTuple<TestPojoMember<Boolean>>(pojo, + null); + Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo)); + } + + @Test + public void testGetBooleanByFieldPojoGetter() throws Exception { + Boolean value = r.nextBoolean(); + TestPojoGetter<Boolean> pojo = new TestPojoGetter<Boolean>(value); + StormTuple<TestPojoGetter<Boolean>> tuple = new StormTuple<TestPojoGetter<Boolean>>(pojo, + null); + Assert.assertSame(value, tuple.getBooleanByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetShortByField() throws Exception { + Short value = (short) r.nextInt(); + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertSame(value, tuple.getShortByField(fieldName)); + } + + @Test + public void testGetShortByFieldPojo() throws Exception { + Short value = (short) r.nextInt(); + TestPojoMember<Short> pojo = new TestPojoMember<Short>(value); + StormTuple<TestPojoMember<Short>> tuple = new StormTuple<TestPojoMember<Short>>(pojo, + null); + Assert.assertSame(value, tuple.getShortByField(fieldNamePojo)); + } + + @Test + public void testGetShortByFieldPojoGetter() throws Exception { + Short value = (short) r.nextInt(); + TestPojoGetter<Short> pojo = new TestPojoGetter<Short>(value); + StormTuple<TestPojoGetter<Short>> tuple = new StormTuple<TestPojoGetter<Short>>(pojo, + null); + Assert.assertSame(value, tuple.getShortByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetByteByField() throws Exception { + Byte value = new Byte((byte) r.nextInt()); + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertSame(value, tuple.getByteByField(fieldName)); + } + + @Test + public void testGetByteByFieldPojo() throws Exception { + Byte value = new Byte((byte) r.nextInt()); + TestPojoMember<Byte> pojo = new TestPojoMember<Byte>(value); + StormTuple<TestPojoMember<Byte>> tuple = new StormTuple<TestPojoMember<Byte>>(pojo, + null); + Assert.assertSame(value, tuple.getByteByField(fieldNamePojo)); + } + + @Test + public void testGetByteByFieldPojoGetter() throws Exception { + Byte value = new Byte((byte) r.nextInt()); + TestPojoGetter<Byte> pojo = new TestPojoGetter<Byte>(value); + StormTuple<TestPojoGetter<Byte>> tuple = new StormTuple<TestPojoGetter<Byte>>(pojo, + null); + Assert.assertSame(value, tuple.getByteByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetDoubleByField() throws Exception { + Double value = r.nextDouble(); + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertSame(value, tuple.getDoubleByField(fieldName)); + } + + @Test + public void testGetDoubleByFieldPojo() throws Exception { + Double value = r.nextDouble(); + TestPojoMember<Double> pojo = new TestPojoMember<Double>(value); + StormTuple<TestPojoMember<Double>> tuple = new StormTuple<TestPojoMember<Double>>(pojo, + null); + Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo)); + } + + @Test + public void testGetDoubleByFieldPojoGetter() throws Exception { + Double value = r.nextDouble(); + TestPojoGetter<Double> pojo = new TestPojoGetter<Double>(value); + StormTuple<TestPojoGetter<Double>> tuple = new StormTuple<TestPojoGetter<Double>>(pojo, + null); + Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetFloatByField() throws Exception { + Float value = r.nextFloat(); + StormTuple tuple = testGetByField(arity, index, value); + Assert.assertSame(value, tuple.getFloatByField(fieldName)); + } + + @Test + public void testGetFloatByFieldPojo() throws Exception { + Float value = r.nextFloat(); + TestPojoMember<Float> pojo = new TestPojoMember<Float>(value); + StormTuple<TestPojoMember<Float>> tuple = new StormTuple<TestPojoMember<Float>>(pojo, + null); + Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo)); + } + + @Test + public void testGetFloatByFieldPojoGetter() throws Exception { + Float value = r.nextFloat(); + TestPojoGetter<Float> pojo = new TestPojoGetter<Float>(value); + StormTuple<TestPojoGetter<Float>> tuple = new StormTuple<TestPojoGetter<Float>>(pojo, + null); + Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo)); + } + + @SuppressWarnings("rawtypes") + @Test + public void testGetBinaryByField() throws Exception { + byte[] data = new byte[1 + r.nextInt(20)]; + r.nextBytes(data); + StormTuple tuple = testGetByField(arity, index, data); + Assert.assertSame(data, tuple.getBinaryByField(fieldName)); + } + + @Test + public void testGetBinaryFieldPojo() throws Exception { + byte[] data = new byte[1 + r.nextInt(20)]; + r.nextBytes(data); + TestPojoMember<byte[]> pojo = new TestPojoMember<byte[]>(data); + StormTuple<TestPojoMember<byte[]>> tuple = new StormTuple<TestPojoMember<byte[]>>(pojo, + null); + Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo)); + } + + @Test + public void testGetBinaryByFieldPojoGetter() throws Exception { + byte[] data = new byte[1 + r.nextInt(20)]; + r.nextBytes(data); + TestPojoGetter<byte[]> pojo = new TestPojoGetter<byte[]>(data); + StormTuple<TestPojoGetter<byte[]>> tuple = new StormTuple<TestPojoGetter<byte[]>>(pojo, + null); + Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private <T> StormTuple testGetByField(int arity, int index, T value) + throws Exception { + + assert (index < arity); + + Tuple tuple = Tuple.getTupleClass(arity).newInstance(); + tuple.setField(value, index); + + ArrayList<String> attributeNames = new ArrayList<String>(arity); + for(int i = 0; i < arity; ++i) { + if(i == index) { + attributeNames.add(fieldName); + } else { + attributeNames.add("" + i); + } + } + Fields schema = new Fields(attributeNames); + + return new StormTuple(tuple, schema); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetSourceGlobalStreamid() { + new StormTuple<Object>(null, null).getSourceGlobalStreamid(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetSourceComponent() { + new StormTuple<Object>(null, null).getSourceComponent(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetSourceTask() { + new StormTuple<Object>(null, null).getSourceTask(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetSourceStreamId() { + new StormTuple<Object>(null, null).getSourceStreamId(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetMessageId() { + new StormTuple<Object>(null, null).getMessageId(); + } + + public static class TestPojoMember<T> { + public T member; + + public TestPojoMember(T value) { + this.member = value; + } + } + + public static class TestPojoGetter<T> { + private T member; + + public TestPojoGetter(T value) { + this.member = value; + } + + public T getMember() { + return this.member; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java new file mode 100644 index 0000000..4c4749a --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.watermark.Watermark; + +import java.util.LinkedList; + +class TestContext implements SourceContext<Tuple1<Integer>> { + public LinkedList<Tuple1<Integer>> result = new LinkedList<Tuple1<Integer>>(); + + public TestContext() { + } + + @Override + public void collect(final Tuple1<Integer> record) { + this.result.add(record.copy()); + } + + @Override + public void collectWithTimestamp(Tuple1<Integer> element, long timestamp) { + this.result.add(element.copy()); + } + + @Override + public void emitWatermark(Watermark mark) { + // ignore it + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } +}