This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cda368dc9607aa0e799514b19dde54b4b1e502e0 Author: Robert Metzger <rmetz...@apache.org> AuthorDate: Tue Feb 11 16:44:45 2020 +0100 [hotfix][e2e] harden elasticsearch test --- flink-end-to-end-tests/test-scripts/common.sh | 47 +++++++++++++++++++--- .../test-scripts/test_streaming_elasticsearch.sh | 25 +++++++++++- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index ed56b3b..0b9d611 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -247,7 +247,9 @@ function wait_rest_endpoint_up { echo "Waiting for ${endpoint_name} REST endpoint to come up..." sleep 1 done - echo "${endpoint_name} REST endpoint has not started within a timeout of ${TIMEOUT} sec" + echo "${endpoint_name} REST endpoint has not started on query url '${query_url}' within a timeout of ${TIMEOUT} sec. curl output:" + curl ${CURL_SSL_ARGS} "$query_url" + echo "Exiting ..." exit 1 } @@ -437,16 +439,51 @@ function wait_for_job_state_transition { done } -function wait_job_running { +function is_job_submitted { + JOB_LIST_RESULT=$("$FLINK_DIR"/bin/flink list -a | grep "$1") + if [[ "$JOB_LIST_RESULT" == "" ]]; then + echo "false" + else + echo "true" + fi +} + +function wait_job_submitted { local TIMEOUT=10 for i in $(seq 1 ${TIMEOUT}); do - JOB_LIST_RESULT=$("$FLINK_DIR"/bin/flink list -r | grep "$1") + local IS_SUBMITTED=`is_job_submitted $1` - if [[ "$JOB_LIST_RESULT" == "" ]]; then - echo "Job ($1) is not yet running." + if [[ "$IS_SUBMITTED" == "true" ]]; then + echo "Job ($1) is submitted." + return else + echo "Job ($1) is not yet submitted." + fi + sleep 1 + done + echo "Job ($1) has not been submitted within a timeout of ${TIMEOUT} sec" + exit 1 +} + +function is_job_running { + JOB_LIST_RESULT=$("$FLINK_DIR"/bin/flink list -r | grep "$1") + if [[ "$JOB_LIST_RESULT" == "" ]]; then + echo "false" + else + echo "true" + fi +} + +function wait_job_running { + local TIMEOUT=10 + for i in $(seq 1 ${TIMEOUT}); do + local IS_RUNNING=`is_job_running $1` + + if [[ "$IS_RUNNING" == "true" ]]; then echo "Job ($1) is running." return + else + echo "Job ($1) is not yet running." fi sleep 1 done diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh index e2ee273..adabb47 100755 --- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh +++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh @@ -39,10 +39,31 @@ on_exit test_cleanup TEST_ES_JAR=${END_TO_END_DIR}/flink-elasticsearch${ELASTICSEARCH_VERSION}-test/target/Elasticsearch${ELASTICSEARCH_VERSION}SinkExample.jar # run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \ +JOB_ID=$($FLINK_DIR/bin/flink run -d -p 1 $TEST_ES_JAR \ --numRecords 20 \ --index index \ - --type type + --type type | awk '{print $NF}' | tail -n 1) + +# wait for 10 seconds +wait_job_submitted ${JOB_ID} + +# Wait for 60 seconds for the job to finish +MAX_RETRY_SECONDS=60 + +start_time=$(date +%s) + +RUNNING=`is_job_running ${JOB_ID}` +while [[ "$RUNNING" == "true" ]]; do + RUNNING=`is_job_running ${JOB_ID}` + current_time=$(date +%s) + time_diff=$((current_time - start_time)) + if [ $time_diff -ge $MAX_RETRY_SECONDS ]; then + echo "Job did not finish after $MAX_RETRY_SECONDS seconds. Printing logs and failing test: " + cat $FLINK_DIR/log/* + exit 1 + fi +done + # 40 index requests and 20 final update requests verify_result_line_number 60 index