This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch FLINK-34324 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0aa57a0878d704496a1dbd9f1c5495c68faa3ff3 Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Wed Jan 31 15:02:24 2024 +0100 [FLINK-34324][test] Makes all s3 related operations being declared and called in a single location --- .../test-scripts/test_file_sink.sh | 111 +++++++++++---------- 1 file changed, 59 insertions(+), 52 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 711f74b6672..5ed1fda2c68 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -20,53 +20,16 @@ OUT_TYPE="${1:-local}" # other type: s3 SINK_TO_TEST="${2:-"StreamingFileSink"}" -S3_PREFIX=temp/test_file_sink-$(uuidgen) -OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX" -S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX" source "$(dirname "$0")"/common.sh -if [ "${OUT_TYPE}" == "s3" ]; then - source "$(dirname "$0")"/common_s3.sh -else - echo "S3 environment is not loaded for non-s3 test runs (test run type: $OUT_TYPE)." -fi - -# randomly set up openSSL with dynamically/statically linked libraries -OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi) -echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" - -s3_setup hadoop -set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" -set_config_key "metrics.fetcher.update-interval" "2000" -# this test relies on global failovers -set_config_key "jobmanager.execution.failover-strategy" "full" - +# OUTPUT_PATH is a local folder that can be used as a download folder for remote data +# the helper functions will access this folder +RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)" +OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}" mkdir -p $OUTPUT_PATH -if [ "${OUT_TYPE}" == "local" ]; then - echo "Use local output" - JOB_OUTPUT_PATH=${OUTPUT_PATH} -elif [ "${OUT_TYPE}" == "s3" ]; then - echo "Use s3 output" - JOB_OUTPUT_PATH=${S3_OUTPUT_PATH} - set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk" - mkdir -p "$OUTPUT_PATH-chk" -else - echo "Unknown output type: ${OUT_TYPE}" - exit 1 -fi - -# make sure we delete the file at the end -function out_cleanup { - s3_delete_by_full_path_prefix "$S3_PREFIX" - s3_delete_by_full_path_prefix "${S3_PREFIX}-chk" - rollback_openssl_lib -} -if [ "${OUT_TYPE}" == "s3" ]; then - on_exit out_cleanup -fi - -TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" +# JOB_OUTPUT_PATH is the location where the job writes its data to +JOB_OUTPUT_PATH="${OUTPUT_PATH}" ################################### # Get all lines in part files and sort them numerically. @@ -79,9 +42,6 @@ TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram. # sorted content of part files ################################### function get_complete_result { - if [ "${OUT_TYPE}" == "s3" ]; then - s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true - fi find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } @@ -89,20 +49,67 @@ function get_complete_result { # Get total number of lines in part files. # # Globals: -# S3_PREFIX +# OUTPUT_PATH # Arguments: # None # Returns: # line number in part files ################################### function get_total_number_of_valid_lines { - if [ "${OUT_TYPE}" == "local" ]; then - get_complete_result | wc -l | tr -d '[:space:]' - elif [ "${OUT_TYPE}" == "s3" ]; then - s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-" - fi + get_complete_result | wc -l | tr -d '[:space:]' } +if [ "${OUT_TYPE}" == "local" ]; then + echo "[INFO] Test run in local environment: No S3 environment is not loaded." +elif [ "${OUT_TYPE}" == "s3" ]; then + # the s3 context requires additional + source "$(dirname "$0")"/common_s3.sh + s3_setup hadoop + + # overwrites JOB_OUTPUT_PATH to point to S3 + S3_DATA_PREFIX="${RANDOM_PREFIX}" + S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk" + JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}" + set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}" + + # overwrites implementation for local runs + function get_complete_result { + # copies the data from S3 to the local OUTPUT_PATH + s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true + + # and prints the sorted output + find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g + } + + # overwrites implementation for local runs + function get_total_number_of_valid_lines { + s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-" + } + + # make sure we delete the file at the end + function out_cleanup { + s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}" + s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}" + rollback_openssl_lib + } + + on_exit out_cleanup +else + echo "[ERROR] Unknown out type: ${OUT_TYPE}" + exit 1 +fi + +# randomly set up openSSL with dynamically/statically linked libraries +OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi) +echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" + +set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" +set_config_key "metrics.fetcher.update-interval" "2000" +# this test relies on global failovers +set_config_key "jobmanager.execution.failover-strategy" "full" + +TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" + ################################### # Waits until a number of values have been written within a timeout. # If the timeout expires, exit with return code 1.