This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 78e6005b5dac7d6a140f98b6c5b941914d907f60 Author: wangyang0918 <danrtsey...@alibaba-inc.com> AuthorDate: Wed Feb 16 16:19:34 2022 +0800 [FLINK-26142] Add e2e test and enable in github actions --- .github/workflows/ci.yml | 38 ++++++++++- e2e-tests/data/cr.yaml | 93 ++++++++++++++++++++++++++ e2e-tests/test_kubernetes_application_ha.sh | 62 +++++++++++++++++ e2e-tests/utils.sh | 100 ++++++++++++++++++++++++++++ 4 files changed, 292 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7b3c557..1d299ab 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -65,4 +65,40 @@ jobs: - name: Stop the operator run: | helm uninstall flink-operator - + e2e_ci: + runs-on: ubuntu-latest + name: e2e_ci + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 11 + uses: actions/setup-java@v2 + with: + java-version: '11' + distribution: 'adopt' + - name: Build with Maven + run: mvn clean install + - name: Start minikube + uses: medyagh/setup-minikube@master + - name: Install cert-manager + run: | + kubectl get pods -A + kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml + kubectl -n cert-manager wait --all=true --for=condition=Ready --timeout=300s pod + - name: Build image + run: | + export SHELL=/bin/bash + export DOCKER_BUILDKIT=1 + eval $(minikube -p minikube docker-env) + docker build -f ./Dockerfile -t flink-kubernetes-operator:ci-latest . + docker images + - name: Start the operator + run: | + helm install flink-operator helm/flink-operator --set image.repository=flink-kubernetes-operator --set image.tag=ci-latest + kubectl wait --for=condition=Available --timeout=120s deploy/flink-operator + kubectl get pods + - name: Run Flink e2e tests + run: | + ./e2e-tests/test_kubernetes_application_ha.sh + - name: Stop the operator + run: | + helm uninstall flink-operator diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml new file mode 100644 index 0000000..cb04df1 --- /dev/null +++ b/e2e-tests/data/cr.yaml @@ -0,0 +1,93 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1alpha1 +kind: FlinkDeployment +metadata: + namespace: default + name: flink-example-statemachine +spec: + image: flink:1.14.3 + flinkVersion: 1.14.3 + flinkConfiguration: + kubernetes.service-account: flink-operator + high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability.storageDir: file:///opt/flink/volume/flink-ha + state.checkpoints.dir: file:///opt/flink/volume/flink-cp + state.savepoints.dir: file:///opt/flink/volume/flink-sp + podTemplate: + apiVersion: v1 + kind: Pod + metadata: + name: pod-template + spec: + initContainers: + - name: artifacts-fetcher + image: busybox:latest + imagePullPolicy: IfNotPresent + # Use wget or other tools to get user jars from remote storage + command: [ 'wget', 'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.3/flink-examples-streaming_2.12-1.14.3.jar', '-O', '/flink-artifact/myjob.jar' ] + volumeMounts: + - mountPath: /flink-artifact + name: flink-artifact + containers: + # Do not change the main container name + - name: flink-main-container + resources: + requests: + ephemeral-storage: 2048Mi + limits: + ephemeral-storage: 2048Mi + volumeMounts: + - mountPath: /opt/flink/usrlib + name: flink-artifact + - mountPath: /opt/flink/volume + name: flink-volume + volumes: + - name: flink-artifact + emptyDir: { } + - name: flink-volume + persistentVolumeClaim: + claimName: flink-example-statemachine + jobManager: + replicas: 1 + resource: + memory: "1024m" + cpu: 0.5 + taskManager: + taskSlots: 2 + resource: + memory: "1024m" + cpu: 0.5 + job: + jarURI: local:///opt/flink/usrlib/myjob.jar + entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample + parallelism: 2 + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: flink-example-statemachine +spec: + accessModes: + - ReadWriteOnce + volumeMode: Filesystem + resources: + requests: + storage: 1Gi diff --git a/e2e-tests/test_kubernetes_application_ha.sh b/e2e-tests/test_kubernetes_application_ha.sh new file mode 100755 index 0000000..b91eab4 --- /dev/null +++ b/e2e-tests/test_kubernetes_application_ha.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/utils.sh + +CLUSTER_ID="flink-example-statemachine" +TIMEOUT=300 + +function cleanup_and_exit() { + if [ $TRAPPED_EXIT_CODE != 0 ];then + debug_and_show_logs + fi + + kubectl delete -f e2e-tests/data/cr.yaml + kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" + kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" +} + +on_exit cleanup_and_exit + +retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1 + +retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 + +kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 +jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') + +echo "Waiting for jobmanager pod ${jm_pod_name} ready." +kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 + +wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 + +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 + +job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}') + +# Kill the JobManager +echo "Kill the $jm_pod_name" +kubectl exec $jm_pod_name -- /bin/sh -c "kill 1" + +# Check the new JobManager recovering from latest successful checkpoint +wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1 +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 + +echo "Successfully run the Flink Kubernetes application HA test" + diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh new file mode 100644 index 0000000..70bc56b --- /dev/null +++ b/e2e-tests/utils.sh @@ -0,0 +1,100 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +function wait_for_logs { + local jm_pod_name=$1 + local successful_response_regex=$2 + local timeout=$3 + + # wait or timeout until the log shows up + echo "Waiting for log \"$2\"..." + for i in $(seq 1 ${timeout}); do + if kubectl logs $jm_pod_name | grep -E "${successful_response_regex}" >/dev/null; then + echo "Log \"$2\" shows up." + return + fi + + sleep 1 + done + echo "Log $2 does not show up within a timeout of ${timeout} sec" + exit 1 +} + +function retry_times() { + local retriesNumber=$1 + local backoff=$2 + local command="$3" + + for i in $(seq 1 ${retriesNumber}) + do + if ${command}; then + return 0 + fi + + echo "Command: ${command} failed. Retrying..." + sleep ${backoff} + done + + echo "Command: ${command} failed ${retriesNumber} times." + return 1 +} + +function debug_and_show_logs { + echo "Debugging failed e2e test:" + echo "Currently existing Kubernetes resources" + kubectl get all + kubectl describe all + + echo "Flink logs:" + kubectl get pods -o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{end}' | while read pod;do + echo "Current logs for $pod: " + kubectl logs $pod; + restart_count=$(kubectl get pod $pod -o jsonpath='{.status.containerStatuses[0].restartCount}') + if [[ ${restart_count} -gt 0 ]];then + echo "Previous logs for $pod: " + kubectl logs $pod --previous + fi + done +} + +function _on_exit_callback { + # Export the exit code so that it could be used by the callback commands + export TRAPPED_EXIT_CODE=$? + # Un-register the callback, to avoid multiple invocations: some shells may treat some signals as subset of others. + trap "" INT EXIT + # Fast exit, if there is another keyboard interrupt. + trap "exit -1" INT + + for command in "${_on_exit_commands[@]-}"; do + eval "${command}" + done +} + +# Register for multiple signals: some shells interpret them as mutually exclusive. +trap _on_exit_callback INT EXIT + +# Helper method to register a command that should be called on current script exit. +# It allows to have multiple "on exit" commands to be called, compared to the built-in `trap "$command" EXIT`. +# Note: tests should not use `trap $command INT|EXIT` directly, to avoid having "Highlander" situation. +function on_exit { + local command="$1" + + # Keep commands in reverse order, so commands would be executed in LIFO order. + _on_exit_commands=("${command}" "${_on_exit_commands[@]-}") +}