This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c531b370 [FLINK-32111] Add Null  check for checkpoints history obj
c531b370 is described below

commit c531b3701a5e1e7ea51d37143979c27f1c88f78f
Author: Tamir Sagi <tamirs...@users.noreply.github.com>
AuthorDate: Fri May 19 18:43:15 2023 +0300

    [FLINK-32111] Add Null  check for checkpoints history obj
---
 docs/content/docs/operations/plugins.md            |  2 +-
 .../operator/service/CheckpointHistoryWrapper.java |  2 +-
 .../operator/service/NativeFlinkServiceTest.java   | 23 ++++++++++++++++++++++
 3 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/operations/plugins.md 
b/docs/content/docs/operations/plugins.md
index efbf34e7..b25f0559 100644
--- a/docs/content/docs/operations/plugins.md
+++ b/docs/content/docs/operations/plugins.md
@@ -107,7 +107,7 @@ Similar to custom validator implementations, resource 
listeners are loaded via t
 In order to enable your custom `FlinkResourceListener` you need to:
 
  1. Implement the interface
- 2. Add your listener class to 
`org.apache.flink.kubernetes.operator.listener.FlinkResourceListener` in 
`META-INF/services`
+ 2. Add your listener class to 
`org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener` in 
`META-INF/services`
  3. Package your JAR and add it to the plugins directory of your operator 
image (`/opt/flink/plugins`)
 
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
index f53d0742..99a9abcd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java
@@ -57,7 +57,7 @@ public class CheckpointHistoryWrapper implements ResponseBody 
{
     private ArrayNode history;
 
     public Optional<PendingCheckpointInfo> getInProgressCheckpoint() {
-        if (history.isEmpty()) {
+        if (history == null || history.isEmpty()) {
             return Optional.empty();
         }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index d7239018..805cf582 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -66,11 +66,13 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -307,6 +309,27 @@ public class NativeFlinkServiceTest {
         objectMapper.readValue(flink15Response, 
CheckpointHistoryWrapper.class);
     }
 
+    @Test
+    public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails()
+            throws JsonProcessingException {
+        ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+        String response =
+                
"{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}";
+        var checkpointHistoryWrapper =
+                objectMapper.readValue(response, 
CheckpointHistoryWrapper.class);
+        Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> 
optionalPendingCheckpointInfo =
+                
assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+        assertTrue(optionalPendingCheckpointInfo.isEmpty());
+    }
+
+    @Test
+    public void testGetInProgressCheckpointsWithoutHistory() {
+        CheckpointHistoryWrapper checkpointHistoryWrapper = new 
CheckpointHistoryWrapper();
+        Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> 
optionalPendingCheckpointInfo =
+                
assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint);
+        assertTrue(optionalPendingCheckpointInfo.isEmpty());
+    }
+
     @Test
     public void testClusterInfoRestCompatibility() throws 
JsonProcessingException {
         ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

Reply via email to