This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2aec27e6cc448fbf3b2872b57403a6a35dd11316 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Tue Feb 1 14:17:51 2022 +0100 [hotfix][test] Extract SourceOperatorTestBase from SourceOperatorTest --- .../api/operators/SourceOperatorTest.java | 134 ++++++--------------- .../api/operators/SourceOperatorTestContext.java | 134 +++++++++++++++++++++ 2 files changed, 171 insertions(+), 97 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index aa0fb7e..c45492b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -18,45 +18,28 @@ limitations under the License. package org.apache.flink.streaming.api.operators; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.mocks.MockSourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.core.io.SimpleVersionedSerialization; -import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; -import org.apache.flink.runtime.state.TestTaskStateManager; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; -import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; import org.apache.flink.streaming.runtime.io.DataInputStatus; -import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; -import org.apache.flink.streaming.util.MockOutput; -import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.util.CollectionUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -74,41 +57,31 @@ import static org.junit.Assert.assertTrue; @SuppressWarnings("serial") public class SourceOperatorTest { - private static final int SUBTASK_INDEX = 1; - private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10); - - private MockSourceReader mockSourceReader; - private MockOperatorEventGateway mockGateway; - private SourceOperator<Integer, MockSourceSplit> operator; + @Nullable private SourceOperatorTestContext context; + @Nullable private SourceOperator<Integer, MockSourceSplit> operator; + @Nullable private MockSourceReader mockSourceReader; + @Nullable private MockOperatorEventGateway mockGateway; @Before public void setup() throws Exception { - this.mockSourceReader = new MockSourceReader(); - this.mockGateway = new MockOperatorEventGateway(); - this.operator = - new TestingSourceOperator<>( - mockSourceReader, - mockGateway, - SUBTASK_INDEX, - true /* emit progressive watermarks */); - Environment env = getTestingEnvironment(); - this.operator.setup( - new SourceOperatorStreamTask<Integer>(env), - new MockStreamConfig(new Configuration(), 1), - new MockOutput<>(new ArrayList<>())); - this.operator.initializeState( - new StreamTaskStateInitializerImpl(env, new MemoryStateBackend())); + context = new SourceOperatorTestContext(); + operator = context.getOperator(); + mockSourceReader = context.getSourceReader(); + mockGateway = context.getGateway(); } @After - public void cleanUp() throws Exception { - operator.close(); - assertTrue(mockSourceReader.isClosed()); + public void tearDown() throws Exception { + context.close(); + context = null; + operator = null; + mockSourceReader = null; + mockGateway = null; } @Test public void testInitializeState() throws Exception { - StateInitializationContext stateContext = getStateContext(); + StateInitializationContext stateContext = context.createStateContext(); operator.initializeState(stateContext); assertNotNull( @@ -120,11 +93,13 @@ public class SourceOperatorTest { @Test public void testOpen() throws Exception { // Initialize the operator. - operator.initializeState(getStateContext()); + operator.initializeState(context.createStateContext()); // Open the operator. operator.open(); // The source reader should have been assigned a split. - assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits()); + assertEquals( + Collections.singletonList(SourceOperatorTestContext.MOCK_SPLIT), + mockSourceReader.getAssignedSplits()); // The source reader should have started. assertTrue(mockSourceReader.isStarted()); @@ -132,17 +107,21 @@ public class SourceOperatorTest { assertEquals(1, mockGateway.getEventsSent().size()); OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0); assertTrue(operatorEvent instanceof ReaderRegistrationEvent); - assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) operatorEvent).subtaskId()); + assertEquals( + SourceOperatorTestContext.SUBTASK_INDEX, + ((ReaderRegistrationEvent) operatorEvent).subtaskId()); } @Test public void testStop() throws Exception { // Initialize the operator. - operator.initializeState(getStateContext()); + operator.initializeState(context.createStateContext()); // Open the operator. operator.open(); // The source reader should have been assigned a split. - assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits()); + assertEquals( + Collections.singletonList(SourceOperatorTestContext.MOCK_SPLIT), + mockSourceReader.getAssignedSplits()); CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>(); assertEquals(DataInputStatus.NOTHING_AVAILABLE, operator.emitNext(dataOutput)); @@ -158,19 +137,21 @@ public class SourceOperatorTest { @Test public void testHandleAddSplitsEvent() throws Exception { - operator.initializeState(getStateContext()); + operator.initializeState(context.createStateContext()); operator.open(); MockSourceSplit newSplit = new MockSourceSplit((2)); operator.handleOperatorEvent( new AddSplitEvent<>( Collections.singletonList(newSplit), new MockSourceSplitSerializer())); // The source reader should have been assigned two splits. - assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits()); + assertEquals( + Arrays.asList(SourceOperatorTestContext.MOCK_SPLIT, newSplit), + mockSourceReader.getAssignedSplits()); } @Test public void testHandleAddSourceEvent() throws Exception { - operator.initializeState(getStateContext()); + operator.initializeState(context.createStateContext()); operator.open(); SourceEvent event = new SourceEvent() {}; operator.handleOperatorEvent(new SourceEventWrapper(event)); @@ -180,7 +161,7 @@ public class SourceOperatorTest { @Test public void testSnapshotState() throws Exception { - StateInitializationContext stateContext = getStateContext(); + StateInitializationContext stateContext = context.createStateContext(); operator.initializeState(stateContext); operator.open(); MockSourceSplit newSplit = new MockSourceSplit((2)); @@ -192,12 +173,12 @@ public class SourceOperatorTest { // Verify the splits in state. List<MockSourceSplit> splitsInState = CollectionUtil.iterableToList(operator.getReaderState().get()); - assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState); + assertEquals(Arrays.asList(SourceOperatorTestContext.MOCK_SPLIT, newSplit), splitsInState); } @Test public void testNotifyCheckpointComplete() throws Exception { - StateInitializationContext stateContext = getStateContext(); + StateInitializationContext stateContext = context.createStateContext(); operator.initializeState(stateContext); operator.open(); operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L)); @@ -207,7 +188,7 @@ public class SourceOperatorTest { @Test public void testNotifyCheckpointAborted() throws Exception { - StateInitializationContext stateContext = getStateContext(); + StateInitializationContext stateContext = context.createStateContext(); operator.initializeState(stateContext); operator.open(); operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L)); @@ -222,45 +203,4 @@ public class SourceOperatorTest { assertThat(initialFuture, not(sameInstance(AvailabilityProvider.AVAILABLE))); assertThat(secondFuture, sameInstance(initialFuture)); } - - // ---------------- helper methods ------------------------- - - private StateInitializationContext getStateContext() throws Exception { - // Create a mock split. - byte[] serializedSplitWithVersion = - SimpleVersionedSerialization.writeVersionAndSerialize( - new MockSourceSplitSerializer(), MOCK_SPLIT); - - // Crate the state context. - OperatorStateStore operatorStateStore = createOperatorStateStore(); - StateInitializationContext stateContext = - new StateInitializationContextImpl(null, operatorStateStore, null, null, null); - - // Update the context. - stateContext - .getOperatorStateStore() - .getListState(SourceOperator.SPLITS_STATE_DESC) - .update(Collections.singletonList(serializedSplitWithVersion)); - - return stateContext; - } - - private OperatorStateStore createOperatorStateStore() throws Exception { - MockEnvironment env = new MockEnvironmentBuilder().build(); - final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(); - CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); - return abstractStateBackend.createOperatorStateBackend( - env, "test-operator", Collections.emptyList(), cancelStreamRegistry); - } - - private Environment getTestingEnvironment() { - return new StreamMockEnvironment( - new Configuration(), - new Configuration(), - new ExecutionConfig(), - 1L, - new MockInputSplitProvider(), - 1, - new TestTaskStateManager()); - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java new file mode 100644 index 0000000..f8e63f2 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java @@ -0,0 +1,134 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.connector.source.mocks.MockSourceReader; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; + +import java.util.ArrayList; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Base class for testing {@link SourceOperator}. */ +@SuppressWarnings("serial") +public class SourceOperatorTestContext implements AutoCloseable { + + public static final int SUBTASK_INDEX = 1; + public static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10); + + private MockSourceReader mockSourceReader; + private MockOperatorEventGateway mockGateway; + private SourceOperator<Integer, MockSourceSplit> operator; + + public SourceOperatorTestContext() throws Exception { + mockSourceReader = new MockSourceReader(); + mockGateway = new MockOperatorEventGateway(); + operator = + new TestingSourceOperator<>( + mockSourceReader, + mockGateway, + SUBTASK_INDEX, + true /* emit progressive watermarks */); + Environment env = getTestingEnvironment(); + operator.setup( + new SourceOperatorStreamTask<Integer>(env), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>())); + operator.initializeState(new StreamTaskStateInitializerImpl(env, new MemoryStateBackend())); + } + + @Override + public void close() throws Exception { + operator.close(); + checkState(mockSourceReader.isClosed()); + } + + public SourceOperator<Integer, MockSourceSplit> getOperator() { + return operator; + } + + public MockOperatorEventGateway getGateway() { + return mockGateway; + } + + public MockSourceReader getSourceReader() { + return mockSourceReader; + } + + public StateInitializationContext createStateContext() throws Exception { + // Create a mock split. + byte[] serializedSplitWithVersion = + SimpleVersionedSerialization.writeVersionAndSerialize( + new MockSourceSplitSerializer(), MOCK_SPLIT); + + // Crate the state context. + OperatorStateStore operatorStateStore = createOperatorStateStore(); + StateInitializationContext stateContext = + new StateInitializationContextImpl(null, operatorStateStore, null, null, null); + + // Update the context. + stateContext + .getOperatorStateStore() + .getListState(SourceOperator.SPLITS_STATE_DESC) + .update(Collections.singletonList(serializedSplitWithVersion)); + + return stateContext; + } + + private OperatorStateStore createOperatorStateStore() throws Exception { + MockEnvironment env = new MockEnvironmentBuilder().build(); + final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + return abstractStateBackend.createOperatorStateBackend( + env, "test-operator", Collections.emptyList(), cancelStreamRegistry); + } + + private Environment getTestingEnvironment() { + return new StreamMockEnvironment( + new Configuration(), + new Configuration(), + new ExecutionConfig(), + 1L, + new MockInputSplitProvider(), + 1, + new TestTaskStateManager()); + } +}