This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new c531b370 [FLINK-32111] Add Null check for checkpoints history obj c531b370 is described below commit c531b3701a5e1e7ea51d37143979c27f1c88f78f Author: Tamir Sagi <tamirs...@users.noreply.github.com> AuthorDate: Fri May 19 18:43:15 2023 +0300 [FLINK-32111] Add Null check for checkpoints history obj --- docs/content/docs/operations/plugins.md | 2 +- .../operator/service/CheckpointHistoryWrapper.java | 2 +- .../operator/service/NativeFlinkServiceTest.java | 23 ++++++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/docs/content/docs/operations/plugins.md b/docs/content/docs/operations/plugins.md index efbf34e7..b25f0559 100644 --- a/docs/content/docs/operations/plugins.md +++ b/docs/content/docs/operations/plugins.md @@ -107,7 +107,7 @@ Similar to custom validator implementations, resource listeners are loaded via t In order to enable your custom `FlinkResourceListener` you need to: 1. Implement the interface - 2. Add your listener class to `org.apache.flink.kubernetes.operator.listener.FlinkResourceListener` in `META-INF/services` + 2. Add your listener class to `org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener` in `META-INF/services` 3. Package your JAR and add it to the plugins directory of your operator image (`/opt/flink/plugins`) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java index f53d0742..99a9abcd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java @@ -57,7 +57,7 @@ public class CheckpointHistoryWrapper implements ResponseBody { private ArrayNode history; public Optional<PendingCheckpointInfo> getInProgressCheckpoint() { - if (history.isEmpty()) { + if (history == null || history.isEmpty()) { return Optional.empty(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index d7239018..805cf582 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -66,11 +66,13 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.Arrays; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -307,6 +309,27 @@ public class NativeFlinkServiceTest { objectMapper.readValue(flink15Response, CheckpointHistoryWrapper.class); } + @Test + public void testGetInProgressCheckpointsFromResponseWithoutHistoryDetails() + throws JsonProcessingException { + ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + String response = + "{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0}}"; + var checkpointHistoryWrapper = + objectMapper.readValue(response, CheckpointHistoryWrapper.class); + Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo = + assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint); + assertTrue(optionalPendingCheckpointInfo.isEmpty()); + } + + @Test + public void testGetInProgressCheckpointsWithoutHistory() { + CheckpointHistoryWrapper checkpointHistoryWrapper = new CheckpointHistoryWrapper(); + Optional<CheckpointHistoryWrapper.PendingCheckpointInfo> optionalPendingCheckpointInfo = + assertDoesNotThrow(checkpointHistoryWrapper::getInProgressCheckpoint); + assertTrue(optionalPendingCheckpointInfo.isEmpty()); + } + @Test public void testClusterInfoRestCompatibility() throws JsonProcessingException { ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();