Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6415#discussion_r205078058
  
    --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
    @@ -0,0 +1,139 @@
    +#!/usr/bin/env bash
    
+################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    
+################################################################################
    +
    +source "$(dirname "$0")"/common.sh
    +
    
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
    +
    +JM_WATCHDOG_PID=0
    +
    +# flag indicating if we have already cleared up things after a test
    +CLEARED=0
    +
    +function stop_cluster_and_watchdog() {
    +    if [ ${CLEARED} -eq 0 ]; then
    +
    +        if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
    +            echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
    +            kill ${JM_WATCHDOG_PID} 2> /dev/null
    +            wait ${JM_WATCHDOG_PID} 2> /dev/null
    +        fi
    +
    +        CLEARED=1
    +    fi
    +}
    +
    +function verify_logs() {
    +    local OUTPUT=$FLINK_DIR/log/*.out
    +    local JM_FAILURES=$1
    +    local EXIT_CODE=0
    +
    +    # verify that we have no alerts
    +    if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
    +        echo "FAILURE: Alerts found at the general purpose DataSet job."
    +        EXIT_CODE=1
    +    fi
    +
    +    # checks that all apart from the first JM recover the failed jobgraph.
    +    if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
    +        echo "FAILURE: A JM did not take over."
    +        EXIT_CODE=1
    +    fi
    +
    +    if [[ $EXIT_CODE != 0 ]]; then
    +        echo "One or more tests FAILED."
    +        exit $EXIT_CODE
    +    fi
    +}
    +
    +function jm_watchdog() {
    +    local EXPECTED_JMS=$1
    +    local IP_PORT=$2
    +
    +    while true; do
    +        local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
    +        local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
    +        for (( c=0; c<MISSING_JMS; c++ )); do
    +            "$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT}
    +        done
    +        sleep 1;
    +    done
    +}
    +
    +function kill_jm {
    +    local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut 
-d " " -f 1`
    +    local JM_PIDS=(${JM_PIDS[@]})
    +    local PID=${JM_PIDS[0]}
    +    kill -9 ${PID}
    +
    +    echo "Killed JM @ ${PID}"
    +}
    +
    +function run_ha_test() {
    +    local PARALLELISM=$1
    +
    +    local JM_KILLS=3
    +
    +    CLEARED=0
    +    mkdir -p ${TEST_DATA_DIR}/control
    +    touch ${TEST_DATA_DIR}/control/test.txt
    +
    +    # start the cluster on HA mode
    +    start_ha_cluster
    +
    +    echo "Running on HA mode: parallelism=${PARALLELISM}."
    +
    +    # submit a job in detached mode and let it run
    +    local JOB_ID=$($FLINK_DIR/bin/flink run -d -p ${PARALLELISM} \
    +        $TEST_PROGRAM_JAR \
    +        --loadFactor 4 \
    +        --outputPath $TEST_DATA_DIR/out/dataset_allround \
    +        --source ${TEST_DATA_DIR}/control/test.txt \
    +        | grep "Job has been submitted with JobID" | sed 's/.* //g')
    +
    +    wait_job_running ${JOB_ID}
    +
    +    # start the watchdog that keeps the number of JMs stable
    +    jm_watchdog 1 "8081" &
    +    JM_WATCHDOG_PID=$!
    +    echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
    +
    +    for (( c=0; c<${JM_KILLS}; c++ )); do
    +        # kill the JM and wait for watchdog to
    +        # create a new one which will take over
    +        kill_jm
    +        sleep 20
    +        wait_job_running ${JOB_ID}
    +    done
    +
    +    printf "STOP" >> ${TEST_DATA_DIR}/control/test.txt
    --- End diff --
    
    We've already verified that the job is properly restarted in the 
jobmanager-kill loop, so this should only be about shutting down the job.


---

Reply via email to