GJL closed pull request #6999: [BP-1.6][FLINK-10357][tests] Improve 
StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6999
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
index 8652d72bfe5..17389ad6f4e 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
@@ -23,11 +23,6 @@ 
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/Stream
 
 OUTPUT_PATH="$TEST_DATA_DIR/out"
 
-function get_num_output_files {
-    local num_files=$(find ${OUTPUT_PATH} -type f | wc -l)
-    echo ${num_files}
-}
-
 function wait_for_restart {
     local base_num_restarts=$1
 
@@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###################################
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###################################
-function wait_for_number_of_checkpoints {
-    local job_id=$1
-    local expected_num_checkpoints=$2
-    local timeout=$3
-    local count=0
-
-    echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-    while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+    find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-        if [[ ${count} -gt ${timeout} ]]; then
-            echo "A timeout occurred waiting for successful checkpoints"
+###################################
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###################################
+function wait_for_complete_result {
+    local expected_number_of_values=$1
+    local polling_timeout=$2
+    local polling_interval=1
+    local seconds_elapsed=0
+
+    local number_of_values=0
+    local previous_number_of_values=-1
+
+    while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+        if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
+            echo "Did not produce expected number of values within 
${polling_timeout}s"
             exit 1
-        else
-            ((count+=2))
         fi
 
-        local current_num_checkpoints=$(get_completed_number_of_checkpoints 
${job_id})
-        echo "${current_num_checkpoints}/${expected_num_checkpoints} completed 
checkpoints"
-        sleep 2
-    done
-}
-
-function get_completed_number_of_checkpoints {
-    local job_id=$1
-    local json_res=$(curl -s http://localhost:8081/jobs/${job_id}/checkpoints)
+        sleep ${polling_interval}
+        ((seconds_elapsed += ${polling_interval}))
 
-    echo ${json_res}    | # 
{"counts":{"restored":0,"total":25,"in_progress":1,"completed":24,"failed":0} 
...
-        cut -d ":" -f 6 | # 24,"failed"
-        sed 's/,.*//'     # 24
+        number_of_values=$(get_complete_result | wc -l | tr -d '[:space:]')
+        if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then
+            echo "Number of produced values 
${number_of_values}/${expected_number_of_values}"
+            previous_number_of_values=${number_of_values}
+        fi
+    done
 }
 
 start_cluster
@@ -126,25 +129,13 @@ echo "Starting 2 TMs"
 
 wait_for_restart 1
 
-echo "Waiting until no new files are being created"
-OLD_COUNT=0
-NEW_COUNT=$(get_num_output_files)
-while ! [[ ${OLD_COUNT} -eq ${NEW_COUNT} ]]; do
-    echo "More output files were created. previous=${OLD_COUNT} 
now=${NEW_COUNT}"
-    # so long as there is data to process new files should be created for each 
checkpoint
-    CURRENT_NUM_CHECKPOINTS=$(get_completed_number_of_checkpoints ${JOB_ID})
-    EXPECTED_NUM_CHECKPOINTS=$((CURRENT_NUM_CHECKPOINTS + 1))
-    wait_for_number_of_checkpoints ${JOB_ID} ${EXPECTED_NUM_CHECKPOINTS} 60
-
-    OLD_COUNT=${NEW_COUNT}
-    NEW_COUNT=$(get_num_output_files)
-done
+echo "Waiting until all values have been produced"
+wait_for_complete_result 60000 300
 
 cancel_job "${JOB_ID}"
 
 wait_job_terminal_state "${JOB_ID}" "CANCELED"
 
-# get all lines in part files and sort them numerically
-find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g > 
"${TEST_DATA_DIR}/complete_result"
+get_complete_result > "${TEST_DATA_DIR}/complete_result"
 
 check_result_hash "File Streaming Sink" "$TEST_DATA_DIR/complete_result" 
"6727342fdd3aae2129e61fc8f433fb6f"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to