[flink] 02/04: [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles

2023-08-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4467ad05bb9823d90be770d5e1c36989f3905d1b
Author: wangfeifan 
AuthorDate: Mon Jul 24 14:44:02 2023 +0800

[refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks 
for state handles
---
 .../checkpoint/CheckpointCoordinatorTest.java  | 47 --
 .../checkpoint/metadata/CheckpointTestUtils.java   | 13 +++---
 ...Handle.java => DiscardRecordedStateObject.java} | 36 +++--
 .../IncrementalRemoteKeyedStateHandleTest.java | 36 ++---
 ...le.java => TestingRelativeFileStateHandle.java} | 22 +-
 .../runtime/state/TestingStreamStateHandle.java|  8 +++-
 6 files changed, 83 insertions(+), 79 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 2f274dab3d8..79752468045 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.DiscardRecordedStateObject;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -75,6 +76,7 @@ import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TernaryBoolean;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -137,7 +139,6 @@ import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -2942,13 +2943,15 @@ class CheckpointCoordinatorTest extends TestLogger {
 streamStateHandle
 instanceof 
PlaceholderStreamStateHandle)
 .isFalse();
-verify(streamStateHandle, 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+streamStateHandle, 
TernaryBoolean.FALSE);
 ++sharedHandleCount;
 }
 
 for (HandleAndLocalPath handleAndLocalPath :
 
incrementalKeyedStateHandle.getPrivateState()) {
-verify(handleAndLocalPath.getHandle(), 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+handleAndLocalPath.getHandle(), 
TernaryBoolean.FALSE);
 }
 
 
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
@@ -2971,7 +2974,8 @@ class CheckpointCoordinatorTest extends TestLogger {
 // by CP1
 for (List cpList : sharedHandlesByCheckpoint) {
 for (HandleAndLocalPath handleAndLocalPath : cpList) {
-verify(handleAndLocalPath.getHandle(), 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+handleAndLocalPath.getHandle(), 
TernaryBoolean.FALSE);
 }
 }
 
@@ -3030,14 +3034,19 @@ class CheckpointCoordinatorTest extends TestLogger {
 // references the state from CP1, so we expect they are not 
discarded.
 verifyDiscard(
 sharedHandlesByCheckpoint,
-cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? 
times(1) : never());
+cpId ->
+restoreMode == RestoreMode.CLAIM && cpId == 0
+? TernaryBoolean.TRUE
+: TernaryBoolean.FALSE);
 
 // discard CP2
 

[flink] 02/04: [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles

2023-08-05 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d00e2eb73aa5f2d2506370b5fbe9c8867cbec987
Author: wangfeifan 
AuthorDate: Mon Jul 24 14:44:02 2023 +0800

[refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks 
for state handles
---
 .../checkpoint/CheckpointCoordinatorTest.java  | 47 --
 .../checkpoint/metadata/CheckpointTestUtils.java   | 13 +++---
 ...Handle.java => DiscardRecordedStateObject.java} | 36 +++--
 .../IncrementalRemoteKeyedStateHandleTest.java | 36 ++---
 ...le.java => TestingRelativeFileStateHandle.java} | 22 +-
 .../runtime/state/TestingStreamStateHandle.java|  8 +++-
 6 files changed, 83 insertions(+), 79 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6fa95fb5380..4eedbc60134 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.DiscardRecordedStateObject;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -73,6 +74,7 @@ import 
org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TernaryBoolean;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -141,7 +143,6 @@ import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -2940,13 +2941,15 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 streamStateHandle
 instanceof 
PlaceholderStreamStateHandle)
 .isFalse();
-verify(streamStateHandle, 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+streamStateHandle, 
TernaryBoolean.FALSE);
 ++sharedHandleCount;
 }
 
 for (HandleAndLocalPath handleAndLocalPath :
 
incrementalKeyedStateHandle.getPrivateState()) {
-verify(handleAndLocalPath.getHandle(), 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+handleAndLocalPath.getHandle(), 
TernaryBoolean.FALSE);
 }
 
 
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
@@ -2969,7 +2972,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 // by CP1
 for (List cpList : sharedHandlesByCheckpoint) {
 for (HandleAndLocalPath handleAndLocalPath : cpList) {
-verify(handleAndLocalPath.getHandle(), 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+handleAndLocalPath.getHandle(), 
TernaryBoolean.FALSE);
 }
 }
 
@@ -3027,14 +3031,19 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 // references the state from CP1, so we expect they are not 
discarded.
 verifyDiscard(
 sharedHandlesByCheckpoint,
-cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? 
times(1) : never());
+cpId ->
+restoreMode == RestoreMode.CLAIM && cpId == 0
+? TernaryBoolean.TRUE
+: TernaryBoolean.FALSE);
 

[flink] 02/04: [refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks for state handles

2023-08-05 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 46fa6a03062b90c460141aa275fa06347bc876da
Author: wangfeifan 
AuthorDate: Mon Jul 24 14:44:02 2023 +0800

[refactor][test] Introduce DiscardRecordedStateObject to avoid using mocks 
for state handles
---
 .../checkpoint/CheckpointCoordinatorTest.java  | 47 --
 .../checkpoint/metadata/CheckpointTestUtils.java   | 13 +++---
 ...Handle.java => DiscardRecordedStateObject.java} | 36 +++--
 .../IncrementalRemoteKeyedStateHandleTest.java | 36 ++---
 ...le.java => TestingRelativeFileStateHandle.java} | 22 +-
 .../runtime/state/TestingStreamStateHandle.java|  8 +++-
 6 files changed, 83 insertions(+), 79 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index baf4527ba88..83c82d6acfb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.DiscardRecordedStateObject;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -74,6 +75,7 @@ import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TernaryBoolean;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -135,7 +137,6 @@ import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -2940,13 +2941,15 @@ class CheckpointCoordinatorTest extends TestLogger {
 streamStateHandle
 instanceof 
PlaceholderStreamStateHandle)
 .isFalse();
-verify(streamStateHandle, 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+streamStateHandle, 
TernaryBoolean.FALSE);
 ++sharedHandleCount;
 }
 
 for (HandleAndLocalPath handleAndLocalPath :
 
incrementalKeyedStateHandle.getPrivateState()) {
-verify(handleAndLocalPath.getHandle(), 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+handleAndLocalPath.getHandle(), 
TernaryBoolean.FALSE);
 }
 
 
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
@@ -2969,7 +2972,8 @@ class CheckpointCoordinatorTest extends TestLogger {
 // by CP1
 for (List cpList : sharedHandlesByCheckpoint) {
 for (HandleAndLocalPath handleAndLocalPath : cpList) {
-verify(handleAndLocalPath.getHandle(), 
never()).discardState();
+DiscardRecordedStateObject.verifyDiscard(
+handleAndLocalPath.getHandle(), 
TernaryBoolean.FALSE);
 }
 }
 
@@ -3028,14 +3032,19 @@ class CheckpointCoordinatorTest extends TestLogger {
 // references the state from CP1, so we expect they are not 
discarded.
 verifyDiscard(
 sharedHandlesByCheckpoint,
-cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? 
times(1) : never());
+cpId ->
+restoreMode == RestoreMode.CLAIM && cpId == 0
+? TernaryBoolean.TRUE
+: TernaryBoolean.FALSE);
 
 // discard CP2