This is an automated email from the ASF dual-hosted git repository. wangyang0918 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new aae96d0c9d1 [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent aae96d0c9d1 is described below commit aae96d0c9d1768c396bdf2ee6510677fbb8f317a Author: wangyang0918 <danrtsey...@alibaba-inc.com> AuthorDate: Mon Aug 15 23:03:19 2022 +0800 [FLINK-28265][k8s] Make KubernetesStateHandleStore#addEntry idempotent This closes #20590. --- .../KubernetesStateHandleStore.java | 16 +++-- .../KubernetesStateHandleStoreTest.java | 75 ++++++++++++++++++++++ 2 files changed, 87 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java index 0716b58ec34..42f07003207 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java @@ -648,10 +648,12 @@ public class KubernetesStateHandleStore<T extends Serializable> private Optional<KubernetesConfigMap> addEntry( KubernetesConfigMap configMap, String key, byte[] serializedStateHandle) throws Exception { - final String content = configMap.getData().get(key); - if (content != null) { + final String oldBase64Content = configMap.getData().get(key); + final String newBase64Content = toBase64(serializedStateHandle); + if (oldBase64Content != null) { try { - final StateHandleWithDeleteMarker<T> stateHandle = deserializeStateHandle(content); + final StateHandleWithDeleteMarker<T> stateHandle = + deserializeStateHandle(oldBase64Content); if (stateHandle.isMarkedForDeletion()) { // This might be a left-over after the fail-over. As the remove operation is // idempotent let's try to finish it. @@ -660,6 +662,12 @@ public class KubernetesStateHandleStore<T extends Serializable> "Unable to remove the marked as deleting entry."); } } else { + // It could happen that the kubernetes client retries a transaction that has + // already succeeded due to network issues. So we simply ignore when the + // new content is same as the existing one. + if (oldBase64Content.equals(newBase64Content)) { + return Optional.of(configMap); + } throw getKeyAlreadyExistException(key); } } catch (IOException e) { @@ -668,7 +676,7 @@ public class KubernetesStateHandleStore<T extends Serializable> logInvalidEntry(key, configMapName, e); } } - configMap.getData().put(key, toBase64(serializedStateHandle)); + configMap.getData().put(key, newBase64Content); return Optional.of(configMap); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java index e246dda45a9..2d58f93a8fe 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.StateHandleWithDeleteMarker; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; @@ -27,10 +28,13 @@ import org.apache.flink.runtime.persistence.PossibleInconsistentStateException; import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.persistence.StringResourceVersion; import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.FunctionUtils; +import io.fabric8.kubernetes.client.KubernetesClientException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +43,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.function.Predicate; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -222,6 +230,45 @@ class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTestBase }; } + @Test + void testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final FlinkKubeClient anotherFlinkKubeClient = + createFlinkKubeClientBuilder() + .setCheckAndUpdateConfigMapFunction( + (configMapName, function) -> + retryWithFirstFailedK8sOperation( + function, getLeaderConfigMap())) + .build(); + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + anotherFlinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + store.addAndLock(key, state); + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize()) + .isEqualTo(1); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount()) + .isEqualTo(0); + assertThat(store.getAllHandles()).hasSize(1); + assertThat(store.getAllHandles()).contains(key); + assertThat(store.getAndLock(key).retrieveState().getValue()) + .isEqualTo(state.getValue()); + }); + } + }; + } + @Test void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception { new Context() { @@ -1143,4 +1190,32 @@ class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTestBase configMap.getData().put(key, deleting); return state; } + + private static CompletableFuture<Boolean> retryWithFirstFailedK8sOperation( + Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> function, + KubernetesConfigMap leaderConfigMap) { + final AtomicInteger callbackInvocationCount = new AtomicInteger(0); + final CompletableFuture<Boolean> result = + FutureUtils.retry( + () -> + CompletableFuture.supplyAsync( + () -> { + callbackInvocationCount.incrementAndGet(); + function.apply(leaderConfigMap); + if (callbackInvocationCount.get() == 1) { + throw new KubernetesClientException( + "Expected exception to simulate unstable " + + "kubernetes client operation"); + } + return true; + }), + KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES + .defaultValue(), + t -> + ExceptionUtils.findThrowable(t, KubernetesClientException.class) + .isPresent(), + Executors.newDirectExecutorService()); + assertThat(callbackInvocationCount.get()).isEqualTo(2); + return result; + } }