This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2aec27e6cc448fbf3b2872b57403a6a35dd11316
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Tue Feb 1 14:17:51 2022 +0100

    [hotfix][test] Extract SourceOperatorTestBase from SourceOperatorTest
---
 .../api/operators/SourceOperatorTest.java          | 134 ++++++---------------
 .../api/operators/SourceOperatorTestContext.java   | 134 +++++++++++++++++++++
 2 files changed, 171 insertions(+), 97 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index aa0fb7e..c45492b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -18,45 +18,28 @@ limitations under the License.
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
-import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
-import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
-import org.apache.flink.streaming.util.MockOutput;
-import org.apache.flink.streaming.util.MockStreamConfig;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -74,41 +57,31 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("serial")
 public class SourceOperatorTest {
 
-    private static final int SUBTASK_INDEX = 1;
-    private static final MockSourceSplit MOCK_SPLIT = new 
MockSourceSplit(1234, 10);
-
-    private MockSourceReader mockSourceReader;
-    private MockOperatorEventGateway mockGateway;
-    private SourceOperator<Integer, MockSourceSplit> operator;
+    @Nullable private SourceOperatorTestContext context;
+    @Nullable private SourceOperator<Integer, MockSourceSplit> operator;
+    @Nullable private MockSourceReader mockSourceReader;
+    @Nullable private MockOperatorEventGateway mockGateway;
 
     @Before
     public void setup() throws Exception {
-        this.mockSourceReader = new MockSourceReader();
-        this.mockGateway = new MockOperatorEventGateway();
-        this.operator =
-                new TestingSourceOperator<>(
-                        mockSourceReader,
-                        mockGateway,
-                        SUBTASK_INDEX,
-                        true /* emit progressive watermarks */);
-        Environment env = getTestingEnvironment();
-        this.operator.setup(
-                new SourceOperatorStreamTask<Integer>(env),
-                new MockStreamConfig(new Configuration(), 1),
-                new MockOutput<>(new ArrayList<>()));
-        this.operator.initializeState(
-                new StreamTaskStateInitializerImpl(env, new 
MemoryStateBackend()));
+        context = new SourceOperatorTestContext();
+        operator = context.getOperator();
+        mockSourceReader = context.getSourceReader();
+        mockGateway = context.getGateway();
     }
 
     @After
-    public void cleanUp() throws Exception {
-        operator.close();
-        assertTrue(mockSourceReader.isClosed());
+    public void tearDown() throws Exception {
+        context.close();
+        context = null;
+        operator = null;
+        mockSourceReader = null;
+        mockGateway = null;
     }
 
     @Test
     public void testInitializeState() throws Exception {
-        StateInitializationContext stateContext = getStateContext();
+        StateInitializationContext stateContext = context.createStateContext();
         operator.initializeState(stateContext);
 
         assertNotNull(
@@ -120,11 +93,13 @@ public class SourceOperatorTest {
     @Test
     public void testOpen() throws Exception {
         // Initialize the operator.
-        operator.initializeState(getStateContext());
+        operator.initializeState(context.createStateContext());
         // Open the operator.
         operator.open();
         // The source reader should have been assigned a split.
-        assertEquals(Collections.singletonList(MOCK_SPLIT), 
mockSourceReader.getAssignedSplits());
+        assertEquals(
+                
Collections.singletonList(SourceOperatorTestContext.MOCK_SPLIT),
+                mockSourceReader.getAssignedSplits());
         // The source reader should have started.
         assertTrue(mockSourceReader.isStarted());
 
@@ -132,17 +107,21 @@ public class SourceOperatorTest {
         assertEquals(1, mockGateway.getEventsSent().size());
         OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0);
         assertTrue(operatorEvent instanceof ReaderRegistrationEvent);
-        assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) 
operatorEvent).subtaskId());
+        assertEquals(
+                SourceOperatorTestContext.SUBTASK_INDEX,
+                ((ReaderRegistrationEvent) operatorEvent).subtaskId());
     }
 
     @Test
     public void testStop() throws Exception {
         // Initialize the operator.
-        operator.initializeState(getStateContext());
+        operator.initializeState(context.createStateContext());
         // Open the operator.
         operator.open();
         // The source reader should have been assigned a split.
-        assertEquals(Collections.singletonList(MOCK_SPLIT), 
mockSourceReader.getAssignedSplits());
+        assertEquals(
+                
Collections.singletonList(SourceOperatorTestContext.MOCK_SPLIT),
+                mockSourceReader.getAssignedSplits());
 
         CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
         assertEquals(DataInputStatus.NOTHING_AVAILABLE, 
operator.emitNext(dataOutput));
@@ -158,19 +137,21 @@ public class SourceOperatorTest {
 
     @Test
     public void testHandleAddSplitsEvent() throws Exception {
-        operator.initializeState(getStateContext());
+        operator.initializeState(context.createStateContext());
         operator.open();
         MockSourceSplit newSplit = new MockSourceSplit((2));
         operator.handleOperatorEvent(
                 new AddSplitEvent<>(
                         Collections.singletonList(newSplit), new 
MockSourceSplitSerializer()));
         // The source reader should have been assigned two splits.
-        assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), 
mockSourceReader.getAssignedSplits());
+        assertEquals(
+                Arrays.asList(SourceOperatorTestContext.MOCK_SPLIT, newSplit),
+                mockSourceReader.getAssignedSplits());
     }
 
     @Test
     public void testHandleAddSourceEvent() throws Exception {
-        operator.initializeState(getStateContext());
+        operator.initializeState(context.createStateContext());
         operator.open();
         SourceEvent event = new SourceEvent() {};
         operator.handleOperatorEvent(new SourceEventWrapper(event));
@@ -180,7 +161,7 @@ public class SourceOperatorTest {
 
     @Test
     public void testSnapshotState() throws Exception {
-        StateInitializationContext stateContext = getStateContext();
+        StateInitializationContext stateContext = context.createStateContext();
         operator.initializeState(stateContext);
         operator.open();
         MockSourceSplit newSplit = new MockSourceSplit((2));
@@ -192,12 +173,12 @@ public class SourceOperatorTest {
         // Verify the splits in state.
         List<MockSourceSplit> splitsInState =
                 CollectionUtil.iterableToList(operator.getReaderState().get());
-        assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState);
+        assertEquals(Arrays.asList(SourceOperatorTestContext.MOCK_SPLIT, 
newSplit), splitsInState);
     }
 
     @Test
     public void testNotifyCheckpointComplete() throws Exception {
-        StateInitializationContext stateContext = getStateContext();
+        StateInitializationContext stateContext = context.createStateContext();
         operator.initializeState(stateContext);
         operator.open();
         operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 
100L));
@@ -207,7 +188,7 @@ public class SourceOperatorTest {
 
     @Test
     public void testNotifyCheckpointAborted() throws Exception {
-        StateInitializationContext stateContext = getStateContext();
+        StateInitializationContext stateContext = context.createStateContext();
         operator.initializeState(stateContext);
         operator.open();
         operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 
100L));
@@ -222,45 +203,4 @@ public class SourceOperatorTest {
         assertThat(initialFuture, 
not(sameInstance(AvailabilityProvider.AVAILABLE)));
         assertThat(secondFuture, sameInstance(initialFuture));
     }
-
-    // ---------------- helper methods -------------------------
-
-    private StateInitializationContext getStateContext() throws Exception {
-        // Create a mock split.
-        byte[] serializedSplitWithVersion =
-                SimpleVersionedSerialization.writeVersionAndSerialize(
-                        new MockSourceSplitSerializer(), MOCK_SPLIT);
-
-        // Crate the state context.
-        OperatorStateStore operatorStateStore = createOperatorStateStore();
-        StateInitializationContext stateContext =
-                new StateInitializationContextImpl(null, operatorStateStore, 
null, null, null);
-
-        // Update the context.
-        stateContext
-                .getOperatorStateStore()
-                .getListState(SourceOperator.SPLITS_STATE_DESC)
-                .update(Collections.singletonList(serializedSplitWithVersion));
-
-        return stateContext;
-    }
-
-    private OperatorStateStore createOperatorStateStore() throws Exception {
-        MockEnvironment env = new MockEnvironmentBuilder().build();
-        final AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend();
-        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
-        return abstractStateBackend.createOperatorStateBackend(
-                env, "test-operator", Collections.emptyList(), 
cancelStreamRegistry);
-    }
-
-    private Environment getTestingEnvironment() {
-        return new StreamMockEnvironment(
-                new Configuration(),
-                new Configuration(),
-                new ExecutionConfig(),
-                1L,
-                new MockInputSplitProvider(),
-                1,
-                new TestTaskStateManager());
-    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
new file mode 100644
index 0000000..f8e63f2
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -0,0 +1,134 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.connector.source.mocks.MockSourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Base class for testing {@link SourceOperator}. */
+@SuppressWarnings("serial")
+public class SourceOperatorTestContext implements AutoCloseable {
+
+    public static final int SUBTASK_INDEX = 1;
+    public static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 
10);
+
+    private MockSourceReader mockSourceReader;
+    private MockOperatorEventGateway mockGateway;
+    private SourceOperator<Integer, MockSourceSplit> operator;
+
+    public SourceOperatorTestContext() throws Exception {
+        mockSourceReader = new MockSourceReader();
+        mockGateway = new MockOperatorEventGateway();
+        operator =
+                new TestingSourceOperator<>(
+                        mockSourceReader,
+                        mockGateway,
+                        SUBTASK_INDEX,
+                        true /* emit progressive watermarks */);
+        Environment env = getTestingEnvironment();
+        operator.setup(
+                new SourceOperatorStreamTask<Integer>(env),
+                new MockStreamConfig(new Configuration(), 1),
+                new MockOutput<>(new ArrayList<>()));
+        operator.initializeState(new StreamTaskStateInitializerImpl(env, new 
MemoryStateBackend()));
+    }
+
+    @Override
+    public void close() throws Exception {
+        operator.close();
+        checkState(mockSourceReader.isClosed());
+    }
+
+    public SourceOperator<Integer, MockSourceSplit> getOperator() {
+        return operator;
+    }
+
+    public MockOperatorEventGateway getGateway() {
+        return mockGateway;
+    }
+
+    public MockSourceReader getSourceReader() {
+        return mockSourceReader;
+    }
+
+    public StateInitializationContext createStateContext() throws Exception {
+        // Create a mock split.
+        byte[] serializedSplitWithVersion =
+                SimpleVersionedSerialization.writeVersionAndSerialize(
+                        new MockSourceSplitSerializer(), MOCK_SPLIT);
+
+        // Crate the state context.
+        OperatorStateStore operatorStateStore = createOperatorStateStore();
+        StateInitializationContext stateContext =
+                new StateInitializationContextImpl(null, operatorStateStore, 
null, null, null);
+
+        // Update the context.
+        stateContext
+                .getOperatorStateStore()
+                .getListState(SourceOperator.SPLITS_STATE_DESC)
+                .update(Collections.singletonList(serializedSplitWithVersion));
+
+        return stateContext;
+    }
+
+    private OperatorStateStore createOperatorStateStore() throws Exception {
+        MockEnvironment env = new MockEnvironmentBuilder().build();
+        final AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend();
+        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
+        return abstractStateBackend.createOperatorStateBackend(
+                env, "test-operator", Collections.emptyList(), 
cancelStreamRegistry);
+    }
+
+    private Environment getTestingEnvironment() {
+        return new StreamMockEnvironment(
+                new Configuration(),
+                new Configuration(),
+                new ExecutionConfig(),
+                1L,
+                new MockInputSplitProvider(),
+                1,
+                new TestTaskStateManager());
+    }
+}

Reply via email to