This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 6a4e6a5 [FLINK-28008] Can not get secondary resource from context after operator restart 6a4e6a5 is described below commit 6a4e6a5edb386675fede2790b81e31cdbcb2952c Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Fri Jun 24 10:28:02 2022 +0200 [FLINK-28008] Can not get secondary resource from context after operator restart --- e2e-tests/test_sessionjob_operations.sh | 13 +++++++++++ .../flink/kubernetes/operator/FlinkOperator.java | 13 ++--------- .../operator/config/FlinkConfigManager.java | 3 ++- .../operator/utils/EventSourceUtils.java | 26 ++++++++++++++++------ .../src/main/resources/log4j2.properties | 3 +++ pom.xml | 2 +- 6 files changed, 40 insertions(+), 20 deletions(-) diff --git a/e2e-tests/test_sessionjob_operations.sh b/e2e-tests/test_sessionjob_operations.sh index e03604d..a4bbad7 100755 --- a/e2e-tests/test_sessionjob_operations.sh +++ b/e2e-tests/test_sessionjob_operations.sh @@ -28,6 +28,7 @@ TIMEOUT=300 SESSION_CLUSTER_IDENTIFIER="flinkdep/$CLUSTER_ID" SESSION_JOB_NAME="flink-example-statemachine" SESSION_JOB_IDENTIFIER="sessionjob/$SESSION_JOB_NAME" +OPERATOR_POD_LABEL="app.kubernetes.io/name=flink-kubernetes-operator" on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID @@ -65,3 +66,15 @@ assert_available_slots 1 $CLUSTER_ID echo "Successfully run the sessionjob savepoint upgrade test" +# Test Operator restart +echo "Delete session job " + $SESSION_JOB_NAME +kubectl delete flinksessionjob $SESSION_JOB_NAME +echo "Killing the operator pod" +kubectl delete pod -l $OPERATOR_POD_LABEL +echo "Submitting the session job again" +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 + +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 +wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 +wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 38ed58f..5e93866 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -57,7 +57,6 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.HashSet; import java.util.Set; -import java.util.UUID; import java.util.concurrent.Executors; /** Main Class for Flink native k8s operator. */ @@ -151,16 +150,8 @@ public class FlinkOperator { } private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) { - // TODO: https://github.com/java-operator-sdk/java-operator-sdk/issues/1259 - String[] watchedNamespaces = - configManager - .getOperatorConfiguration() - .getWatchedNamespaces() - .toArray(String[]::new); - String fakeNs = UUID.randomUUID().toString(); - overrider.settingNamespace(fakeNs); - overrider.addingNamespaces(watchedNamespaces); - overrider.removingNamespaces(fakeNs); + overrider.settingNamespaces( + configManager.getOperatorConfiguration().getWatchedNamespaces()); overrider.withRetry( GenericRetry.fromConfiguration( configManager.getOperatorConfiguration().getRetryConfiguration())); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 8f67978..0895f29 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -122,9 +122,10 @@ public class FlinkConfigManager { && this.defaultConfig.toMap().equals(newConf.toMap())) { LOG.info("Default configuration did not change, nothing to do..."); return; + } else { + LOG.info("Setting default configuration to {}", newConf); } - LOG.info("Updating default configuration to {}", newConf); var oldNs = Optional.ofNullable(this.operatorConfiguration) .map(FlinkOperatorConfiguration::getWatchedNamespaces) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java index 4771723..62bea6d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java @@ -27,11 +27,13 @@ import io.fabric8.kubernetes.api.model.apps.Deployment; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** Utility class to locate secondary resources. */ @@ -71,7 +73,7 @@ public class EventSourceUtils { FLINK_DEPLOYMENT_IDX, flinkDeployment -> List.of( - compositeKey( + indexKey( flinkDeployment.getMetadata().getName(), flinkDeployment.getMetadata().getNamespace()))); @@ -82,7 +84,7 @@ public class EventSourceUtils { context.getPrimaryCache() .byIndex( FLINK_DEPLOYMENT_IDX, - compositeKey( + indexKey( sessionJob .getSpec() .getDeploymentName(), @@ -106,7 +108,7 @@ public class EventSourceUtils { FLINK_SESSIONJOB_IDX, sessionJob -> List.of( - compositeKey( + indexKey( sessionJob.getSpec().getDeploymentName(), sessionJob.getMetadata().getNamespace()))); @@ -117,7 +119,7 @@ public class EventSourceUtils { context.getPrimaryCache() .byIndex( FLINK_SESSIONJOB_IDX, - compositeKey( + indexKey( flinkDeployment .getMetadata() .getName(), @@ -127,14 +129,24 @@ public class EventSourceUtils { .stream() .map(ResourceID::fromResource) .collect(Collectors.toSet())) + .withPrimaryToSecondaryMapper( + (PrimaryToSecondaryMapper<FlinkSessionJob>) + sessionJob -> + Set.of( + new ResourceID( + sessionJob + .getSpec() + .getDeploymentName(), + sessionJob + .getMetadata() + .getNamespace()))) .withNamespacesInheritedFromController(context) .followNamespaceChanges(true) .build(); - return new InformerEventSource<>(configuration, context); } - private static String compositeKey(String name, String namespace) { - return String.format("%s_%s", name, namespace); + private static String indexKey(String name, String namespace) { + return name + "#" + namespace; } } diff --git a/flink-kubernetes-operator/src/main/resources/log4j2.properties b/flink-kubernetes-operator/src/main/resources/log4j2.properties index c3d3c15..f7f24f4 100644 --- a/flink-kubernetes-operator/src/main/resources/log4j2.properties +++ b/flink-kubernetes-operator/src/main/resources/log4j2.properties @@ -24,3 +24,6 @@ appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] [%X{resource.namespace}.%X{resource.name}] %msg%n%throwable} + +logger.conf.name = org.apache.flink.configuration.GlobalConfiguration +logger.conf.level = WARN diff --git a/pom.xml b/pom.xml index 052074e..981815f 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ under the License. <maven-resources-plugin.version>3.2.0</maven-resources-plugin.version> <git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version> - <operator.sdk.version>3.0.2</operator.sdk.version> + <operator.sdk.version>3.0.3</operator.sdk.version> <operator.sdk.admission-controller.version>0.2.0</operator.sdk.admission-controller.version> <fabric8.version>5.12.2</fabric8.version>