This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new a4dd58545d5 Revert "[FLINK-34324][test] Makes all s3 related operations being declared and called in a single location" a4dd58545d5 is described below commit a4dd58545d59b59089d9321a743d6c98a7c8e855 Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Tue Feb 6 17:32:22 2024 +0100 Revert "[FLINK-34324][test] Makes all s3 related operations being declared and called in a single location" This reverts commit a15515ebc0e4c59ea0642e745e942591c28b3a3c. --- .../test-scripts/test_file_sink.sh | 111 ++++++++++----------- 1 file changed, 52 insertions(+), 59 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 5ed1fda2c68..711f74b6672 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -20,16 +20,53 @@ OUT_TYPE="${1:-local}" # other type: s3 SINK_TO_TEST="${2:-"StreamingFileSink"}" +S3_PREFIX=temp/test_file_sink-$(uuidgen) +OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX" +S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX" source "$(dirname "$0")"/common.sh -# OUTPUT_PATH is a local folder that can be used as a download folder for remote data -# the helper functions will access this folder -RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)" -OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}" +if [ "${OUT_TYPE}" == "s3" ]; then + source "$(dirname "$0")"/common_s3.sh +else + echo "S3 environment is not loaded for non-s3 test runs (test run type: $OUT_TYPE)." +fi + +# randomly set up openSSL with dynamically/statically linked libraries +OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi) +echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" + +s3_setup hadoop +set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" +set_config_key "metrics.fetcher.update-interval" "2000" +# this test relies on global failovers +set_config_key "jobmanager.execution.failover-strategy" "full" + mkdir -p $OUTPUT_PATH -# JOB_OUTPUT_PATH is the location where the job writes its data to -JOB_OUTPUT_PATH="${OUTPUT_PATH}" +if [ "${OUT_TYPE}" == "local" ]; then + echo "Use local output" + JOB_OUTPUT_PATH=${OUTPUT_PATH} +elif [ "${OUT_TYPE}" == "s3" ]; then + echo "Use s3 output" + JOB_OUTPUT_PATH=${S3_OUTPUT_PATH} + set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk" + mkdir -p "$OUTPUT_PATH-chk" +else + echo "Unknown output type: ${OUT_TYPE}" + exit 1 +fi + +# make sure we delete the file at the end +function out_cleanup { + s3_delete_by_full_path_prefix "$S3_PREFIX" + s3_delete_by_full_path_prefix "${S3_PREFIX}-chk" + rollback_openssl_lib +} +if [ "${OUT_TYPE}" == "s3" ]; then + on_exit out_cleanup +fi + +TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" ################################### # Get all lines in part files and sort them numerically. @@ -42,6 +79,9 @@ JOB_OUTPUT_PATH="${OUTPUT_PATH}" # sorted content of part files ################################### function get_complete_result { + if [ "${OUT_TYPE}" == "s3" ]; then + s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true + fi find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } @@ -49,67 +89,20 @@ function get_complete_result { # Get total number of lines in part files. # # Globals: -# OUTPUT_PATH +# S3_PREFIX # Arguments: # None # Returns: # line number in part files ################################### function get_total_number_of_valid_lines { - get_complete_result | wc -l | tr -d '[:space:]' + if [ "${OUT_TYPE}" == "local" ]; then + get_complete_result | wc -l | tr -d '[:space:]' + elif [ "${OUT_TYPE}" == "s3" ]; then + s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-" + fi } -if [ "${OUT_TYPE}" == "local" ]; then - echo "[INFO] Test run in local environment: No S3 environment is not loaded." -elif [ "${OUT_TYPE}" == "s3" ]; then - # the s3 context requires additional - source "$(dirname "$0")"/common_s3.sh - s3_setup hadoop - - # overwrites JOB_OUTPUT_PATH to point to S3 - S3_DATA_PREFIX="${RANDOM_PREFIX}" - S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk" - JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}" - set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}" - - # overwrites implementation for local runs - function get_complete_result { - # copies the data from S3 to the local OUTPUT_PATH - s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true - - # and prints the sorted output - find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g - } - - # overwrites implementation for local runs - function get_total_number_of_valid_lines { - s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-" - } - - # make sure we delete the file at the end - function out_cleanup { - s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}" - s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}" - rollback_openssl_lib - } - - on_exit out_cleanup -else - echo "[ERROR] Unknown out type: ${OUT_TYPE}" - exit 1 -fi - -# randomly set up openSSL with dynamically/statically linked libraries -OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi) -echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" - -set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" -set_config_key "metrics.fetcher.update-interval" "2000" -# this test relies on global failovers -set_config_key "jobmanager.execution.failover-strategy" "full" - -TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" - ################################### # Waits until a number of values have been written within a timeout. # If the timeout expires, exit with return code 1.