This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6be277cd [FLINK-33006] add e2e for flink operator ha
6be277cd is described below

commit 6be277cd0c7421f4822c49296198f4a34b2cd721
Author: Peter Huang <hpe...@apple.com>
AuthorDate: Fri Jan 26 07:50:50 2024 -0800

    [FLINK-33006] add e2e for flink operator ha
---
 .github/workflows/ci.yml            | 14 +++++++-
 e2e-tests/test_dynamic_config.sh    |  6 ++--
 e2e-tests/test_flink_operator_ha.sh | 71 +++++++++++++++++++++++++++++++++++++
 e2e-tests/utils.sh                  | 65 +++++++++++++++++++++++++--------
 4 files changed, 139 insertions(+), 17 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 3734b59b..d3970cf4 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -87,6 +87,7 @@ jobs:
           - test_sessionjob_operations.sh
           - test_multi_sessionjob.sh
           - test_autoscaler.sh
+          - test_flink_operator_ha.sh
         include:
           - namespace: flink
             extraArgs: '--create-namespace --set 
"watchNamespaces={default,flink}"'
@@ -113,12 +114,18 @@ jobs:
             test: test_autoscaler.sh
           - version: v1_15
             test: test_dynamic_config.sh
+          - version: v1_15
+            test: test_flink_operator_ha.sh
           - version: v1_16
             test: test_autoscaler.sh
           - version: v1_16
             test: test_dynamic_config.sh
+          - version: v1_16
+            test: test_flink_operator_ha.sh
           - version: v1_17
             test: test_dynamic_config.sh
+          - version: v1_17
+            test: test_flink_operator_ha.sh
           - version: v1_15
             java-version: 17
           - version: v1_16
@@ -164,9 +171,14 @@ jobs:
           docker images
       - name: Start the operator
         run: |
+          if [[ "${{ matrix.test }}" == "test_flink_operator_ha.sh" ]]; then
+            sed -i "s/# kubernetes.operator.leader-election.enabled: 
false/kubernetes.operator.leader-election.enabled: true/" 
helm/flink-kubernetes-operator/conf/flink-conf.yaml
+            sed -i "s/# kubernetes.operator.leader-election.lease-name: 
flink-operator-lease/kubernetes.operator.leader-election.lease-name: 
flink-operator-lease/" helm/flink-kubernetes-operator/conf/flink-conf.yaml
+            sed -i "s/replicas: 1/replicas: 2/" 
helm/flink-kubernetes-operator/values.yaml
+          fi 
           helm --debug install flink-kubernetes-operator -n ${{ 
matrix.namespace }} helm/flink-kubernetes-operator --set 
image.repository=flink-kubernetes-operator --set image.tag=ci-latest ${{ 
matrix.extraArgs }}
           kubectl wait --for=condition=Available --timeout=120s -n ${{ 
matrix.namespace }} deploy/flink-kubernetes-operator
-          kubectl get pods
+          kubectl get pods -n ${{ matrix.namespace }}
       - name: Run Flink e2e tests
         run: |
           sed -i "s/image: flink:.*/image: ${{ matrix.image }}/" 
e2e-tests/data/*.yaml
diff --git a/e2e-tests/test_dynamic_config.sh b/e2e-tests/test_dynamic_config.sh
index 26dfef3d..3569d2c3 100644
--- a/e2e-tests/test_dynamic_config.sh
+++ b/e2e-tests/test_dynamic_config.sh
@@ -28,11 +28,13 @@ on_exit operator_cleanup_and_exit
 
 TIMEOUT=360
 
-operator_namespace=${get_operator_pod_namespace}
+operator_namespace=$(get_operator_pod_namespace)
+operator_pod=$(get_operator_pod_name)
+echo "Current operator pod is ${operator_pod}"
 create_namespace dynamic
 
 kubectl config set-context --current --namespace="${operator_namespace}"
 patch_flink_config '{"data": {"flink-conf.yaml": 
"kubernetes.operator.watched.namespaces: default,flink,dynamic"}}'
-wait_for_operator_logs "Setting default configuration to 
{kubernetes.operator.watched.namespaces=default,flink,dynamic}" ${TIMEOUT} || 
exit 1
+wait_for_operator_logs "${operator_pod}" "Setting default configuration to 
{kubernetes.operator.watched.namespaces=default,flink,dynamic}" ${TIMEOUT} || 
exit 1
 
 echo "Successfully run the dynamic property test"
diff --git a/e2e-tests/test_flink_operator_ha.sh 
b/e2e-tests/test_flink_operator_ha.sh
new file mode 100644
index 00000000..4ed40d41
--- /dev/null
+++ b/e2e-tests/test_flink_operator_ha.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################
+
+# This script tests the operator HA:
+# 1. Deploy a new flink deployment and wait for job manager to come up
+# 2. Verify the operator log on existing leader
+# 3. Delete the leader operator pod
+# 4. Verify the new leader is different with the old one
+# 5. Check operator log for the flink deployment in the new leader
+
+SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
+source "${SCRIPT_DIR}/utils.sh"
+
+CLUSTER_ID="flink-example-statemachine"
+APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-cr.yaml"
+TIMEOUT=300
+
+on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
+
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
+
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || 
exit 1
+wait_for_status flinkdep/flink-example-statemachine 
'.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
+wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' 
RUNNING ${TIMEOUT} || exit 1
+
+job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job 
[a-z0-9]+ is submitted' | awk '{print $2}')
+
+
+# Verify operator status
+operator_namespace=$(get_operator_pod_namespace)
+display_current_lease_info
+old_operator_leader=$(find_operator_pod_with_leadership)
+
+echo "Current operator pod with leadership is ${old_operator_leader}"
+wait_for_operator_logs "${old_operator_leader}" 
".default/flink-example-statemachine. Resource fully reconciled, nothing to do" 
${TIMEOUT} || exit 1
+
+# Delete the leader operator pod
+delete_operator_pod_with_leadership
+
+# Wait for 20 seconds for leader election
+sleep 20
+display_current_lease_info
+new_operator_leader=$(find_operator_pod_with_leadership)
+echo "Current operator pod with leadership is ${new_operator_leader}"
+
+if [ "${new_operator_leader}" == "${old_operator_leader}" ];then
+  echo "The new operator pod with leadership is the same as old operator pod. 
New operator pod haven't acquire leadership"
+  exit 1
+fi
+
+wait_for_operator_logs "${new_operator_leader}" 
".default/flink-example-statemachine. Resource fully reconciled, nothing to do" 
${TIMEOUT} || exit 1
+echo "Successfully run the Flink Kubernetes application HA test in the new 
operator leader"
\ No newline at end of file
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index e3f5b741..fe48acca 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -37,16 +37,16 @@ function wait_for_logs {
 }
 
 function wait_for_operator_logs {
-  local successful_response_regex=$1
-  local timeout=$2
-  operator_pod_name=$(get_operator_pod_name)
+  local operator_pod_name=$1
+  local successful_response_regex=$2
+  local timeout=$3
   operator_pod_namespace=$(get_operator_pod_namespace)
 
     # wait or timeout until the log shows up
-  echo "Waiting for operator log \"$1\"..."
+  echo "Waiting for operator log \"$2\"..."
   for i in $(seq 1 ${timeout}); do
-    if kubectl logs $operator_pod_name -c flink-kubernetes-operator -n 
"${operator_pod_namespace}" | grep -E "${successful_response_regex}" 
>/dev/null; then
-      echo "Log \"$1\" shows up."
+    if kubectl logs "${operator_pod_name}" -c flink-kubernetes-operator -n 
"${operator_pod_namespace}" | grep -E "${successful_response_regex}" 
>/dev/null; then
+      echo "Log \"$2\" shows up."
       return
     fi
 
@@ -124,12 +124,14 @@ function wait_for_jobmanager_running() {
 }
 
 function get_operator_pod_namespace() {
-    operator_pod_namespace=$(kubectl get pods 
--selector="app.kubernetes.io/name=flink-kubernetes-operator" -o 
jsonpath='{..metadata.namespace}' --all-namespaces)
-    if [ "$(grep -c . <<<"${operator_pod_namespace}")" != 1 ]; then
-      echo "Invalid operator pod namespace: ${operator_pod_namespace}" >&2
+    # It will return multiple namespaces split by empty space if there are 
multiple operator instance in HA mode
+    operator_pod_namespaces=$(kubectl get pods 
--selector="app.kubernetes.io/name=flink-kubernetes-operator" -o 
jsonpath='{..metadata.namespace}' --all-namespaces)
+    operator_pod_namespac_array=(${operator_pod_namespaces})
+    if [ "$(grep -c . <<<"${operator_pod_namespac_array[0]}")" != 1 ]; then
+      echo "Invalid operator pod namespace: ${operator_pod_namespac_array[0]}" 
>&2
       exit 1
     fi
-    echo "${operator_pod_namespace}"
+    echo "${operator_pod_namespac_array[0]}"
 }
 
 function get_operator_pod_name() {
@@ -184,17 +186,52 @@ function patch_flink_config() {
   kubectl patch cm flink-operator-config -n "${operator_pod_namespace}" --type 
merge -p "${patch}"
 }
 
+function display_current_lease_info() {
+  operator_pod_namespace=$(get_operator_pod_namespace)
+  lease=$(kubectl get lease flink-operator-lease -o yaml -n 
"${operator_pod_namespace}")
+  echo "Current lease content: ${lease}"
+}
+
+function find_operator_pod_with_leadership() {
+  operator_pod_namespace=$(get_operator_pod_namespace)
+  active_pod_name=$(kubectl get lease flink-operator-lease -o 
jsonpath='{..spec.holderIdentity}' -n "${operator_pod_namespace}")
+  if [ "$(grep -c . <<<"${active_pod_name}")" != 1 ]; then
+    echo "Invalid leader operator pod name: ${active_pod_name}" >&2
+    exit 1
+  fi
+  echo "${active_pod_name}"
+}
+
+function delete_operator_pod_with_leadership() {
+  active_pod_name=$(find_operator_pod_with_leadership)
+  echo "Leader Operator Pod is ${active_pod_name}"
+  kubectl delete pod "${active_pod_name}" -n "${operator_pod_namespace}"
+  echo "Leader Operator Pod ${active_pod_name} is deleted"
+}
+
 function debug_and_show_logs {
     echo "Debugging failed e2e test:"
     echo "Currently existing Kubernetes resources"
     kubectl get all
     kubectl describe all
 
-    echo "Operator logs:"
     operator_pod_namespace=$(get_operator_pod_namespace)
-    operator_pod_name=$(get_operator_pod_name)
-    echo "Operator namespace: ${operator_pod_namespace} pod: 
${operator_pod_name}"
-    kubectl logs -n "${operator_pod_namespace}" "${operator_pod_name}"
+    operator_pod_names=$(get_operator_pod_name)
+    echo "Currently existing Kubernetes resources of operator namespace"
+    kubectl get all -n "${operator_pod_namespace}"
+    kubectl describe all -n "${operator_pod_namespace}"
+
+    operator_pod_namespaces_array=(${operator_pod_names})
+    length=${#operator_pod_namespaces_array[@]}
+
+    # There are two operator pods in HA mode
+    for (( i=0; i<${length}; i++ ));
+    do
+      echo "Operator ${operator_pod_namespaces_array[$i]} logs:"
+      echo "Operator namespace: ${operator_pod_namespace} pod: 
${operator_pod_namespaces_array[$i]}}"
+
+      kubectl logs -n "${operator_pod_namespace}" 
"${operator_pod_namespaces_array[$i]}"
+    done
 
     echo "Flink logs:"
     kubectl get pods -o jsonpath='{range 
.items[*]}{.metadata.name}{"\n"}{end}' | while read pod;do

Reply via email to