This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 6be277cd [FLINK-33006] add e2e for flink operator ha 6be277cd is described below commit 6be277cd0c7421f4822c49296198f4a34b2cd721 Author: Peter Huang <hpe...@apple.com> AuthorDate: Fri Jan 26 07:50:50 2024 -0800 [FLINK-33006] add e2e for flink operator ha --- .github/workflows/ci.yml | 14 +++++++- e2e-tests/test_dynamic_config.sh | 6 ++-- e2e-tests/test_flink_operator_ha.sh | 71 +++++++++++++++++++++++++++++++++++++ e2e-tests/utils.sh | 65 +++++++++++++++++++++++++-------- 4 files changed, 139 insertions(+), 17 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3734b59b..d3970cf4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,6 +87,7 @@ jobs: - test_sessionjob_operations.sh - test_multi_sessionjob.sh - test_autoscaler.sh + - test_flink_operator_ha.sh include: - namespace: flink extraArgs: '--create-namespace --set "watchNamespaces={default,flink}"' @@ -113,12 +114,18 @@ jobs: test: test_autoscaler.sh - version: v1_15 test: test_dynamic_config.sh + - version: v1_15 + test: test_flink_operator_ha.sh - version: v1_16 test: test_autoscaler.sh - version: v1_16 test: test_dynamic_config.sh + - version: v1_16 + test: test_flink_operator_ha.sh - version: v1_17 test: test_dynamic_config.sh + - version: v1_17 + test: test_flink_operator_ha.sh - version: v1_15 java-version: 17 - version: v1_16 @@ -164,9 +171,14 @@ jobs: docker images - name: Start the operator run: | + if [[ "${{ matrix.test }}" == "test_flink_operator_ha.sh" ]]; then + sed -i "s/# kubernetes.operator.leader-election.enabled: false/kubernetes.operator.leader-election.enabled: true/" helm/flink-kubernetes-operator/conf/flink-conf.yaml + sed -i "s/# kubernetes.operator.leader-election.lease-name: flink-operator-lease/kubernetes.operator.leader-election.lease-name: flink-operator-lease/" helm/flink-kubernetes-operator/conf/flink-conf.yaml + sed -i "s/replicas: 1/replicas: 2/" helm/flink-kubernetes-operator/values.yaml + fi helm --debug install flink-kubernetes-operator -n ${{ matrix.namespace }} helm/flink-kubernetes-operator --set image.repository=flink-kubernetes-operator --set image.tag=ci-latest ${{ matrix.extraArgs }} kubectl wait --for=condition=Available --timeout=120s -n ${{ matrix.namespace }} deploy/flink-kubernetes-operator - kubectl get pods + kubectl get pods -n ${{ matrix.namespace }} - name: Run Flink e2e tests run: | sed -i "s/image: flink:.*/image: ${{ matrix.image }}/" e2e-tests/data/*.yaml diff --git a/e2e-tests/test_dynamic_config.sh b/e2e-tests/test_dynamic_config.sh index 26dfef3d..3569d2c3 100644 --- a/e2e-tests/test_dynamic_config.sh +++ b/e2e-tests/test_dynamic_config.sh @@ -28,11 +28,13 @@ on_exit operator_cleanup_and_exit TIMEOUT=360 -operator_namespace=${get_operator_pod_namespace} +operator_namespace=$(get_operator_pod_namespace) +operator_pod=$(get_operator_pod_name) +echo "Current operator pod is ${operator_pod}" create_namespace dynamic kubectl config set-context --current --namespace="${operator_namespace}" patch_flink_config '{"data": {"flink-conf.yaml": "kubernetes.operator.watched.namespaces: default,flink,dynamic"}}' -wait_for_operator_logs "Setting default configuration to {kubernetes.operator.watched.namespaces=default,flink,dynamic}" ${TIMEOUT} || exit 1 +wait_for_operator_logs "${operator_pod}" "Setting default configuration to {kubernetes.operator.watched.namespaces=default,flink,dynamic}" ${TIMEOUT} || exit 1 echo "Successfully run the dynamic property test" diff --git a/e2e-tests/test_flink_operator_ha.sh b/e2e-tests/test_flink_operator_ha.sh new file mode 100644 index 00000000..4ed40d41 --- /dev/null +++ b/e2e-tests/test_flink_operator_ha.sh @@ -0,0 +1,71 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################### + +# This script tests the operator HA: +# 1. Deploy a new flink deployment and wait for job manager to come up +# 2. Verify the operator log on existing leader +# 3. Delete the leader operator pod +# 4. Verify the new leader is different with the old one +# 5. Check operator log for the flink deployment in the new leader + +SCRIPT_DIR=$(dirname "$(readlink -f "$0")") +source "${SCRIPT_DIR}/utils.sh" + +CLUSTER_ID="flink-example-statemachine" +APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-cr.yaml" +TIMEOUT=300 + +on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID + +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 + +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) + +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 +wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 +wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 + +job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}') + + +# Verify operator status +operator_namespace=$(get_operator_pod_namespace) +display_current_lease_info +old_operator_leader=$(find_operator_pod_with_leadership) + +echo "Current operator pod with leadership is ${old_operator_leader}" +wait_for_operator_logs "${old_operator_leader}" ".default/flink-example-statemachine. Resource fully reconciled, nothing to do" ${TIMEOUT} || exit 1 + +# Delete the leader operator pod +delete_operator_pod_with_leadership + +# Wait for 20 seconds for leader election +sleep 20 +display_current_lease_info +new_operator_leader=$(find_operator_pod_with_leadership) +echo "Current operator pod with leadership is ${new_operator_leader}" + +if [ "${new_operator_leader}" == "${old_operator_leader}" ];then + echo "The new operator pod with leadership is the same as old operator pod. New operator pod haven't acquire leadership" + exit 1 +fi + +wait_for_operator_logs "${new_operator_leader}" ".default/flink-example-statemachine. Resource fully reconciled, nothing to do" ${TIMEOUT} || exit 1 +echo "Successfully run the Flink Kubernetes application HA test in the new operator leader" \ No newline at end of file diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh index e3f5b741..fe48acca 100755 --- a/e2e-tests/utils.sh +++ b/e2e-tests/utils.sh @@ -37,16 +37,16 @@ function wait_for_logs { } function wait_for_operator_logs { - local successful_response_regex=$1 - local timeout=$2 - operator_pod_name=$(get_operator_pod_name) + local operator_pod_name=$1 + local successful_response_regex=$2 + local timeout=$3 operator_pod_namespace=$(get_operator_pod_namespace) # wait or timeout until the log shows up - echo "Waiting for operator log \"$1\"..." + echo "Waiting for operator log \"$2\"..." for i in $(seq 1 ${timeout}); do - if kubectl logs $operator_pod_name -c flink-kubernetes-operator -n "${operator_pod_namespace}" | grep -E "${successful_response_regex}" >/dev/null; then - echo "Log \"$1\" shows up." + if kubectl logs "${operator_pod_name}" -c flink-kubernetes-operator -n "${operator_pod_namespace}" | grep -E "${successful_response_regex}" >/dev/null; then + echo "Log \"$2\" shows up." return fi @@ -124,12 +124,14 @@ function wait_for_jobmanager_running() { } function get_operator_pod_namespace() { - operator_pod_namespace=$(kubectl get pods --selector="app.kubernetes.io/name=flink-kubernetes-operator" -o jsonpath='{..metadata.namespace}' --all-namespaces) - if [ "$(grep -c . <<<"${operator_pod_namespace}")" != 1 ]; then - echo "Invalid operator pod namespace: ${operator_pod_namespace}" >&2 + # It will return multiple namespaces split by empty space if there are multiple operator instance in HA mode + operator_pod_namespaces=$(kubectl get pods --selector="app.kubernetes.io/name=flink-kubernetes-operator" -o jsonpath='{..metadata.namespace}' --all-namespaces) + operator_pod_namespac_array=(${operator_pod_namespaces}) + if [ "$(grep -c . <<<"${operator_pod_namespac_array[0]}")" != 1 ]; then + echo "Invalid operator pod namespace: ${operator_pod_namespac_array[0]}" >&2 exit 1 fi - echo "${operator_pod_namespace}" + echo "${operator_pod_namespac_array[0]}" } function get_operator_pod_name() { @@ -184,17 +186,52 @@ function patch_flink_config() { kubectl patch cm flink-operator-config -n "${operator_pod_namespace}" --type merge -p "${patch}" } +function display_current_lease_info() { + operator_pod_namespace=$(get_operator_pod_namespace) + lease=$(kubectl get lease flink-operator-lease -o yaml -n "${operator_pod_namespace}") + echo "Current lease content: ${lease}" +} + +function find_operator_pod_with_leadership() { + operator_pod_namespace=$(get_operator_pod_namespace) + active_pod_name=$(kubectl get lease flink-operator-lease -o jsonpath='{..spec.holderIdentity}' -n "${operator_pod_namespace}") + if [ "$(grep -c . <<<"${active_pod_name}")" != 1 ]; then + echo "Invalid leader operator pod name: ${active_pod_name}" >&2 + exit 1 + fi + echo "${active_pod_name}" +} + +function delete_operator_pod_with_leadership() { + active_pod_name=$(find_operator_pod_with_leadership) + echo "Leader Operator Pod is ${active_pod_name}" + kubectl delete pod "${active_pod_name}" -n "${operator_pod_namespace}" + echo "Leader Operator Pod ${active_pod_name} is deleted" +} + function debug_and_show_logs { echo "Debugging failed e2e test:" echo "Currently existing Kubernetes resources" kubectl get all kubectl describe all - echo "Operator logs:" operator_pod_namespace=$(get_operator_pod_namespace) - operator_pod_name=$(get_operator_pod_name) - echo "Operator namespace: ${operator_pod_namespace} pod: ${operator_pod_name}" - kubectl logs -n "${operator_pod_namespace}" "${operator_pod_name}" + operator_pod_names=$(get_operator_pod_name) + echo "Currently existing Kubernetes resources of operator namespace" + kubectl get all -n "${operator_pod_namespace}" + kubectl describe all -n "${operator_pod_namespace}" + + operator_pod_namespaces_array=(${operator_pod_names}) + length=${#operator_pod_namespaces_array[@]} + + # There are two operator pods in HA mode + for (( i=0; i<${length}; i++ )); + do + echo "Operator ${operator_pod_namespaces_array[$i]} logs:" + echo "Operator namespace: ${operator_pod_namespace} pod: ${operator_pod_namespaces_array[$i]}}" + + kubectl logs -n "${operator_pod_namespace}" "${operator_pod_namespaces_array[$i]}" + done echo "Flink logs:" kubectl get pods -o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{end}' | while read pod;do