Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 5d416cfc4 -> 2c7309cf6
http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java deleted file mode 100644 index c4e9f51..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - - -public class TestMessageStreamImplUtil { - public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) { - return new MessageStreamImpl<M>(graph); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java deleted file mode 100644 index 9a425d1..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.data; - -import org.apache.samza.system.SystemStreamPartition; - - -/** - * Example input {@link MessageEnvelope} w/ Json message and string as the key. - */ - -public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> { - - private final String key; - private final T data; - private final Offset offset; - private final SystemStreamPartition partition; - - public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) { - this.key = key; - this.data = data; - this.offset = offset; - this.partition = partition; - } - - @Override - public T getMessage() { - return this.data; - } - - @Override - public String getKey() { - return this.key; - } - - public Offset getOffset() { - return this.offset; - } - - public SystemStreamPartition getSystemStreamPartition() { - return this.partition; - } -} - http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java deleted file mode 100644 index 361972e..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.operators.TestMessageEnvelope; -import org.apache.samza.operators.TestOutputMessageEnvelope; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.hamcrest.core.IsEqual; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - - -public class TestOperatorImpl { - - TestMessageEnvelope curInputMsg; - MessageCollector curCollector; - TaskCoordinator curCoordinator; - - @Test - public void testSubscribers() { - this.curInputMsg = null; - this.curCollector = null; - this.curCoordinator = null; - OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() { - @Override - public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { - TestOperatorImpl.this.curInputMsg = message; - TestOperatorImpl.this.curCollector = collector; - TestOperatorImpl.this.curCoordinator = coordinator; - } - }; - // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext() - OperatorImpl mockSub = mock(OperatorImpl.class); - opImpl.registerNextOperator(mockSub); - TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class); - MessageCollector mockCollector = mock(MessageCollector.class); - TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - opImpl.propagateResult(xOutput, mockCollector, mockCoordinator); - verify(mockSub, times(1)).onNext( - argThat(new IsEqual<>(xOutput)), - argThat(new IsEqual<>(mockCollector)), - argThat(new IsEqual<>(mockCoordinator)) - ); - // verify onNext() is invoked correctly - TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class); - opImpl.onNext(mockInput, mockCollector, mockCoordinator); - assertEquals(mockInput, this.curInputMsg); - assertEquals(mockCollector, this.curCollector); - assertEquals(mockCoordinator, this.curCoordinator); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java deleted file mode 100644 index 02637a3..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.TestMessageEnvelope; -import org.apache.samza.operators.TestMessageStreamImplUtil; -import org.apache.samza.operators.TestOutputMessageEnvelope; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.spec.PartialJoinOperatorSpec; -import org.apache.samza.operators.spec.SinkOperatorSpec; -import org.apache.samza.operators.spec.StreamOperatorSpec; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.operators.windows.internal.WindowInternal; -import org.apache.samza.task.TaskContext; -import org.junit.Before; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestOperatorImpls { - Field nextOperatorsField = null; - Method createOpMethod = null; - Method createOpsMethod = null; - - @Before - public void prep() throws NoSuchFieldException, NoSuchMethodException { - nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators"); - nextOperatorsField.setAccessible(true); - - createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class, - OperatorSpec.class, Config.class, TaskContext.class); - createOpMethod.setAccessible(true); - - createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class); - createOpsMethod.setAccessible(true); - } - - @Test - public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException { - // get window operator - WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class); - WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null); - when(mockWnd.getWindow()).thenReturn(windowInternal); - MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class); - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - - OperatorGraph opGraph = new OperatorGraph(); - OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) - createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext); - assertTrue(opImpl instanceof WindowOperatorImpl); - Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window"); - wndInternalField.setAccessible(true); - WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl); - assertEquals(wndInternal, windowInternal); - - // get simple operator - StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class); - FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class); - when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn); - opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext); - assertTrue(opImpl instanceof StreamOperatorImpl); - Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn"); - txfmFnField.setAccessible(true); - assertEquals(mockTxfmFn, txfmFnField.get(opImpl)); - - // get sink operator - SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { }; - SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); - when(sinkOp.getSinkFn()).thenReturn(sinkFn); - opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext); - assertTrue(opImpl instanceof SinkOperatorImpl); - Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn"); - sinkFnField.setAccessible(true); - assertEquals(sinkFn, sinkFnField.get(opImpl)); - - // get join operator - PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class); - TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class); - PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class); - when(joinOp.getTransformFn()).thenReturn(joinFn); - opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext); - assertTrue(opImpl instanceof PartialJoinOperatorImpl); - } - - @Test - public void testEmptyChain() throws InvocationTargetException, IllegalAccessException { - // test creation of empty chain - MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class); - TaskContext mockContext = mock(TaskContext.class); - Config mockConfig = mock(Config.class); - OperatorGraph opGraph = new OperatorGraph(); - RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext); - assertTrue(operatorChain != null); - } - - @Test - public void testLinearChain() throws IllegalAccessException, InvocationTargetException { - // test creation of linear chain - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); - TaskContext mockContext = mock(TaskContext.class); - Config mockConfig = mock(Config.class); - testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10))); - OperatorGraph opGraph = new OperatorGraph(); - RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext); - Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain); - assertEquals(subsSet.size(), 1); - OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next(); - Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl); - assertEquals(subsOps.size(), 1); - OperatorImpl wndOpImpl = subsOps.iterator().next(); - subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl); - assertEquals(subsOps.size(), 0); - } - - @Test - public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException { - // test creation of broadcast chain - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); - TaskContext mockContext = mock(TaskContext.class); - Config mockConfig = mock(Config.class); - testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } }); - testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m); - OperatorGraph opGraph = new OperatorGraph(); - RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext); - Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain); - assertEquals(subsSet.size(), 2); - Iterator<OperatorImpl> iter = subsSet.iterator(); - // check the first branch w/ flatMap - OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next(); - Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl); - assertEquals(subsOps.size(), 1); - OperatorImpl flatMapImpl = subsOps.iterator().next(); - subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl); - assertEquals(subsOps.size(), 0); - // check the second branch w/ map - opImpl = iter.next(); - subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl); - assertEquals(subsOps.size(), 1); - OperatorImpl mapImpl = subsOps.iterator().next(); - subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl); - assertEquals(subsOps.size(), 0); - } - - @Test - public void testJoinChain() throws IllegalAccessException, InvocationTargetException { - // test creation of join chain - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); - MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); - TaskContext mockContext = mock(TaskContext.class); - Config mockConfig = mock(Config.class); - input1 - .join(input2, - new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() { - @Override - public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) { - return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); - } - - @Override - public String getFirstKey(TestMessageEnvelope message) { - return message.getKey(); - } - - @Override - public String getSecondKey(TestMessageEnvelope message) { - return message.getKey(); - } - }) - .map(m -> m); - OperatorGraph opGraph = new OperatorGraph(); - // now, we create chained operators from each input sources - RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext); - RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext); - // check that those two chains will merge at map operator - // first branch of the join - Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1); - assertEquals(subsSet.size(), 1); - OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next(); - Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1); - assertEquals(subsOps.size(), 1); - // the map operator consumes the common join output, where two branches merge - OperatorImpl mapImpl = subsOps.iterator().next(); - // second branch of the join - subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2); - assertEquals(subsSet.size(), 1); - OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next(); - assertNotSame(joinOp1, joinOp2); - subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2); - assertEquals(subsOps.size(), 1); - // make sure that the map operator is the same - assertEquals(mapImpl, subsOps.iterator().next()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java deleted file mode 100644 index ce9fdd2..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.TestOutputMessageEnvelope; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.spec.SinkOperatorSpec; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Test; - -import static org.mockito.Mockito.*; - - -public class TestSinkOperatorImpl { - - @Test - public void testSinkOperator() { - SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class); - SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class); - when(sinkOp.getSinkFn()).thenReturn(sinkFn); - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext); - TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class); - MessageCollector mockCollector = mock(MessageCollector.class); - TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - - sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator); - verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java deleted file mode 100644 index 010a210..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import java.util.ArrayList; -import java.util.Collection; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.TestMessageEnvelope; -import org.apache.samza.operators.TestOutputMessageEnvelope; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.spec.StreamOperatorSpec; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Test; - -import static org.mockito.Mockito.*; - - -public class TestStreamOperatorImpl { - - @Test - public void testSimpleOperator() { - StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); - FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class); - when(mockOp.getTransformFn()).thenReturn(txfmFn); - MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class); - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext)); - TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class); - TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class); - Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { { - this.add(outMsg); - } }; - when(txfmFn.apply(inMsg)).thenReturn(mockOutputs); - MessageCollector mockCollector = mock(MessageCollector.class); - TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - opImpl.onNext(inMsg, mockCollector, mockCoordinator); - verify(txfmFn, times(1)).apply(inMsg); - verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java deleted file mode 100644 index 31257a4..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.spec; - -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.TestMessageEnvelope; -import org.apache.samza.operators.TestMessageStreamImplUtil; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.windows.internal.WindowInternal; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - - -public class TestOperatorSpecs { - @Test - public void testGetStreamOperator() { - FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { { - this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L)); - } }; - MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class); - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput); - assertEquals(strmOp.getTransformFn(), transformFn); - assertEquals(strmOp.getNextStream(), mockOutput); - } - - @Test - public void testGetSinkOperator() { - SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector, - TaskCoordinator taskCoordinator) -> { }; - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph); - assertEquals(sinkOp.getSinkFn(), sinkFn); - assertTrue(sinkOp.getNextStream() == null); - } - - @Test - public void testGetWindowOperator() throws Exception { - Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey"; - BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c + 1; - - //instantiate a window using reflection - WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null); - - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class); - WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut); - assertEquals(spec.getWindow(), window); - assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor); - assertEquals(spec.getWindow().getFoldFunction(), aggregator); - } - - @Test - public void testGetPartialJoinOperator() { - PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger = - new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() { - @Override - public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) { - return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime()); - } - - @Override - public Object getKey(MessageEnvelope<Object, ?> message) { - return message.getKey(); - } - - @Override - public Object getOtherKey(MessageEnvelope<Object, ?> message) { - return message.getKey(); - } - }; - - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); - PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin = - OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput); - - assertEquals(partialJoin.getNextStream(), joinOutput); - MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class); - MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class); - assertEquals(partialJoin.getTransformFn(), merger); - } - - @Test - public void testGetMergeOperator() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph); - StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output); - Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { { - this.add(t); - } }; - TestMessageEnvelope t = mock(TestMessageEnvelope.class); - assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t)); - assertEquals(mergeOp.getNextStream(), output); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 813882c..5de30d8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -20,7 +20,6 @@ include \ 'samza-api', 'samza-elasticsearch', 'samza-log4j', - 'samza-operator', 'samza-rest', 'samza-shell'