Repository: flink Updated Branches: refs/heads/master 6d0d366eb -> a666455c9
[FLINK-8973] [E2E] HA end-to-end test with StateMachineExample. Adds an end-to-end test that runs the StateMachineExample on a local cluster with HA enabled. There is a single JM which gets killed and re-created and we check if the new JM picks up the job execution and if at the end the StateMachine has no ALERTs printed. This closes #5750. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a666455c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a666455c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a666455c Branch: refs/heads/master Commit: a666455c98c269c63373e991c6ca2751e132a7c8 Parents: 6d0d366 Author: kkloudas <kklou...@gmail.com> Authored: Thu Mar 15 13:13:46 2018 +0100 Committer: Timo Walther <twal...@apache.org> Committed: Tue Apr 3 11:57:33 2018 +0200 ---------------------------------------------------------------------- flink-end-to-end-tests/run-nightly-tests.sh | 9 + flink-end-to-end-tests/test-scripts/common.sh | 106 +++++++++- flink-end-to-end-tests/test-scripts/test_ha.sh | 209 +++++++++++++++++++ flink-examples/flink-examples-streaming/pom.xml | 8 +- .../statemachine/StateMachineExample.java | 37 +++- 5 files changed, 362 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 1ece1db..714dd2d 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -47,6 +47,15 @@ EXIT_CODE=0 # EXIT_CODE=$? # fi + +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Running HA end-to-end test\n" + printf "==============================================================================\n" + $END_TO_END_DIR/test-scripts/test_ha.sh + EXIT_CODE=$? +fi + if [ $EXIT_CODE == 0 ]; then printf "\n==============================================================================\n" printf "Running Resuming Savepoint (no parallelism change) end-to-end test\n" http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/test-scripts/common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index d4b9126..0db735a 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -39,6 +39,101 @@ cd $TEST_ROOT export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N) echo "TEST_DATA_DIR: $TEST_DATA_DIR" +function revert_default_config() { + + # revert our modifications to the masters file + if [ -f $FLINK_DIR/conf/masters.bak ]; then + rm $FLINK_DIR/conf/masters + mv $FLINK_DIR/conf/masters.bak $FLINK_DIR/conf/masters + fi + + # revert our modifications to the Flink conf yaml + if [ -f $FLINK_DIR/conf/flink-conf.yaml.bak ]; then + rm $FLINK_DIR/conf/flink-conf.yaml + mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml + fi +} + +function create_ha_config() { + + # back up the masters and flink-conf.yaml + cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak + cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak + + # clean up the dir that will be used for zookeeper storage + # (see high-availability.zookeeper.storageDir below) + if [ -e $TEST_DATA_DIR/recovery ]; then + echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..." + rm -rf $TEST_DATA_DIR/recovery + fi + + # create the masters file (only one currently). + # This must have all the masters to be used in HA. + echo "localhost:8081" > ${FLINK_DIR}/conf/masters + + # then move on to create the flink-conf.yaml + sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL + #============================================================================== + # Common + #============================================================================== + + jobmanager.rpc.address: localhost + jobmanager.rpc.port: 6123 + jobmanager.heap.mb: 1024 + taskmanager.heap.mb: 1024 + taskmanager.numberOfTaskSlots: 4 + parallelism.default: 1 + + #============================================================================== + # High Availability + #============================================================================== + + high-availability: zookeeper + high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/ + high-availability.zookeeper.quorum: localhost:2181 + high-availability.zookeeper.path.root: /flink + high-availability.cluster-id: /test_cluster_one + + #============================================================================== + # Web Frontend + #============================================================================== + + web.port: 8081 +EOL +} + +function start_ha_cluster { + create_ha_config + start_local_zk + start_cluster +} + +function start_local_zk { + # Parses the zoo.cfg and starts locally zk. + + # This is almost the same code as the + # /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost. + + while read server ; do + server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim + + # match server.id=address[:port[:port]] + if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then + id=${BASH_REMATCH[1]} + address=${BASH_REMATCH[2]} + + if [ "${address}" != "localhost" ]; then + echo "[ERROR] Parse error. Only available for localhost." + PASS="" + exit 1 + fi + ${FLINK_DIR}/bin/zookeeper.sh start $id + else + echo "[WARN] Parse error. Skipping config entry '$server'." + fi + done < <(grep "^server\." "${FLINK_DIR}/conf/zoo.cfg") +} + function start_cluster { "$FLINK_DIR"/bin/start-cluster.sh @@ -62,6 +157,11 @@ function start_cluster { function stop_cluster { "$FLINK_DIR"/bin/stop-cluster.sh + # stop zookeeper only if there are processes running + if ! [ `jps | grep 'FlinkZooKeeperQuorumPeer' | wc -l` -eq 0 ]; then + "$FLINK_DIR"/bin/zookeeper.sh stop + fi + if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \ | grep -v "RetriableCommitFailedException" \ | grep -v "NoAvailableBrokersException" \ @@ -107,8 +207,6 @@ function stop_cluster { cat $FLINK_DIR/log/*.out PASS="" fi - - rm $FLINK_DIR/log/* } function wait_job_running { @@ -201,7 +299,9 @@ function s3_delete { # make sure to clean up even in case of failures function cleanup { stop_cluster - rm -r $TEST_DATA_DIR check_all_pass + rm -rf $TEST_DATA_DIR + rm $FLINK_DIR/log/* + revert_default_config } trap cleanup EXIT http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-end-to-end-tests/test-scripts/test_ha.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_ha.sh b/flink-end-to-end-tests/test-scripts/test_ha.sh new file mode 100755 index 0000000..2e65504 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_ha.sh @@ -0,0 +1,209 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=$FLINK_DIR/examples/streaming/StateMachineExample.jar\ --error-rate\ 0.0\ --sleep\ 2 + +JM_WATCHDOG_PID=0 +TM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { + if [ ${CLEARED} -eq 0 ]; then + + if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then + echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" + kill ${JM_WATCHDOG_PID} 2> /dev/null + wait ${JM_WATCHDOG_PID} 2> /dev/null + fi + + if ! [ ${TM_WATCHDOG_PID} -eq 0 ]; then + echo "Killing TM watchdog @ ${TM_WATCHDOG_PID}" + kill ${TM_WATCHDOG_PID} 2> /dev/null + wait ${TM_WATCHDOG_PID} 2> /dev/null + fi + + cleanup + CLEARED=1 + fi +} + +function verify_logs() { + local OUTPUT=$1 + local JM_FAILURES=$2 + + # verify that we have no alerts + if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then + echo "FAILURE: Alerts found at the StateMachineExample with 0.0 error rate." + PASS="" + fi + + # checks that all apart from the first JM recover the failed jobgraph. + if ! [ `grep -r --include '*standalonesession*.log' Recovered SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then + echo "FAILURE: A JM did not take over." + PASS="" + fi + + # search the logs for JMs that log completed checkpoints + if ! [ `grep -r --include '*standalonesession*.log' Completed checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then + echo "FAILURE: A JM did not execute the job." + PASS="" + fi + + if [[ ! "$PASS" ]]; then + echo "One or more tests FAILED." + exit 1 + fi +} + +function jm_watchdog() { + local EXPECTED_JMS=$1 + local IP_PORT=$2 + + while true; do + local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; + local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) + for (( c=0; c<MISSING_JMS; c++ )); do + "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT} + done + sleep 5; + done +} + +function kill_jm { + local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1` + local JM_PIDS=(${JM_PIDS[@]}) + local PID=${JM_PIDS[0]} + kill -9 ${PID} + + echo "Killed JM @ ${PID}" +} + +function tm_watchdog() { + local JOB_ID=$1 + local EXPECTED_TMS=$2 + + # the number of already seen successful checkpoints + local SUCCESSFUL_CHCKP=0 + + while true; do + + # check how many successful checkpoints we have + # and kill a TM only if the previous one already had some + + local CHECKPOINTS=`curl -s "http://localhost:8081/jobs/${JOB_ID}/checkpoints" | cut -d ":" -f 6 | sed 's/,.*//'` + + if [[ ${CHECKPOINTS} =~ '^[0-9]+$' ]] || [[ ${CHECKPOINTS} == "" ]]; then + + # this may be the case during leader election. + # in this case we retry later with a smaller interval + sleep 5; continue + + elif [ "${CHECKPOINTS}" -ne "${SUCCESSFUL_CHCKP}" ]; then + + # we are not only searching for > because when the JM goes down, + # the job starts with reporting 0 successful checkpoints + + local RUNNING_TMS=`jps | grep 'TaskManager' | wc -l` + local TM_PIDS=`jps | grep 'TaskManager' | cut -d " " -f 1` + + local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS)) + if [ ${MISSING_TMS} -eq 0 ]; then + # start a new TM only if we have exactly the expected number + "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null + fi + + # kill an existing one + local TM_PIDS=(${TM_PIDS[@]}) + local PID=${TM_PIDS[0]} + kill -9 ${PID} + + echo "Killed TM @ ${PID}" + + SUCCESSFUL_CHCKP=${CHECKPOINTS} + fi + + sleep 11; + done +} + +function run_ha_test() { + local PARALLELISM=$1 + local BACKEND=$2 + local ASYNC=$3 + local INCREM=$4 + local OUTPUT=$5 + + local JM_KILLS=3 + local CHECKPOINT_DIR="${TEST_DATA_DIR}/checkpoints/" + + CLEARED=0 + + # start the cluster on HA mode + start_ha_cluster + + echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}." + + # submit a job in detached mode and let it run + local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \ + $TEST_PROGRAM_JAR \ + --backend ${BACKEND} \ + --checkpoint-dir "file://${CHECKPOINT_DIR}" \ + --async-checkpoints ${ASYNC} \ + --incremental-checkpoints ${INCREM} \ + --output ${OUTPUT} | grep "Job has been submitted with JobID" | sed 's/.* //g') + + wait_job_running ${JOB_ID} + + # start the watchdog that keeps the number of JMs stable + jm_watchdog 1 "8081" & + JM_WATCHDOG_PID=$! + echo "Running JM watchdog @ ${JM_WATCHDOG_PID}" + + sleep 5 + + # start the watchdog that keeps the number of TMs stable + tm_watchdog ${JOB_ID} 1 & + TM_WATCHDOG_PID=$! + echo "Running TM watchdog @ ${TM_WATCHDOG_PID}" + + # let the job run for a while to take some checkpoints + sleep 20 + + for (( c=0; c<${JM_KILLS}; c++ )); do + # kill the JM and wait for watchdog to + # create a new one which will take over + kill_jm + sleep 60 + done + + verify_logs ${OUTPUT} ${JM_KILLS} + + # kill the cluster and zookeeper + stop_cluster_and_watchdog +} + +trap stop_cluster_and_watchdog EXIT +run_ha_test 4 "file" "false" "false" "${TEST_DATA_DIR}/output.txt" +run_ha_test 4 "rocks" "false" "false" "${TEST_DATA_DIR}/output.txt" +run_ha_test 4 "file" "true" "false" "${TEST_DATA_DIR}/output.txt" +run_ha_test 4 "rocks" "false" "true" "${TEST_DATA_DIR}/output.txt" http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-examples/flink-examples-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index ea253d8..6ff5512 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -84,7 +84,13 @@ under the License. <type>test-jar</type> </dependency> - </dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/flink/blob/a666455c/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index 14757fb..054ed0a 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -23,6 +23,9 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -55,6 +58,12 @@ public class StateMachineExample { System.out.println("Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]"); System.out.println("Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]"); + System.out.println("Options for both the above setups: "); + System.out.println("\t[--backend <file|rocks>]"); + System.out.println("\t[--checkpoint-dir <filepath>]"); + System.out.println("\t[--async-checkpoints <true|false>]"); + System.out.println("\t[--incremental-checkpoints <true|false>]"); + System.out.println("\t[--output <filepath> OR null for stdout]"); System.out.println(); // ---- determine whether to use the built-in source, or read from Kafka ---- @@ -92,7 +101,23 @@ public class StateMachineExample { // create the environment to create streams and configure execution final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); + env.enableCheckpointing(2000L); + + final String stateBackend = params.get("backend", "memory"); + if ("file".equals(stateBackend)) { + final String checkpointDir = params.get("checkpoint-dir"); + boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false); + env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints)); + } else if ("rocks".equals(stateBackend)) { + final String checkpointDir = params.get("checkpoint-dir"); + boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false); + env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints)); + } + + final String outputFile = params.get("output"); + + // make parameters available in the web interface + env.getConfig().setGlobalJobParameters(params); DataStream<Event> events = env.addSource(source); @@ -105,7 +130,13 @@ public class StateMachineExample { .flatMap(new StateMachineMapper()); // output the alerts to std-out - alerts.print(); + if (outputFile == null) { + alerts.print(); + } else { + alerts + .writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE) + .setParallelism(1); + } // trigger program execution env.execute("State machine job"); @@ -140,7 +171,7 @@ public class StateMachineExample { state = State.Initial; } - // ask the state machine what state we should go to based on teh given event + // ask the state machine what state we should go to based on the given event State nextState = state.transition(evt.type()); if (nextState == State.InvalidTransition) {