This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch FLINK-34324
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0aa57a0878d704496a1dbd9f1c5495c68faa3ff3
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Wed Jan 31 15:02:24 2024 +0100

    [FLINK-34324][test] Makes all s3 related operations being declared and 
called in a single location
---
 .../test-scripts/test_file_sink.sh                 | 111 +++++++++++----------
 1 file changed, 59 insertions(+), 52 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 711f74b6672..5ed1fda2c68 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -20,53 +20,16 @@
 OUT_TYPE="${1:-local}" # other type: s3
 SINK_TO_TEST="${2:-"StreamingFileSink"}"
 
-S3_PREFIX=temp/test_file_sink-$(uuidgen)
-OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
-S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
 source "$(dirname "$0")"/common.sh
 
-if [ "${OUT_TYPE}" == "s3" ]; then
-  source "$(dirname "$0")"/common_s3.sh
-else
-  echo "S3 environment is not loaded for non-s3 test runs (test run type: 
$OUT_TYPE)."
-fi
-
-# randomly set up openSSL with dynamically/statically linked libraries
-OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo 
"static"; fi)
-echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
-
-s3_setup hadoop
-set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
-set_config_key "metrics.fetcher.update-interval" "2000"
-# this test relies on global failovers
-set_config_key "jobmanager.execution.failover-strategy" "full"
-
+# OUTPUT_PATH is a local folder that can be used as a download folder for 
remote data
+# the helper functions will access this folder
+RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
+OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
 mkdir -p $OUTPUT_PATH
 
-if [ "${OUT_TYPE}" == "local" ]; then
-  echo "Use local output"
-  JOB_OUTPUT_PATH=${OUTPUT_PATH}
-elif [ "${OUT_TYPE}" == "s3" ]; then
-  echo "Use s3 output"
-  JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
-  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
-  mkdir -p "$OUTPUT_PATH-chk"
-else
-  echo "Unknown output type: ${OUT_TYPE}"
-  exit 1
-fi
-
-# make sure we delete the file at the end
-function out_cleanup {
-  s3_delete_by_full_path_prefix "$S3_PREFIX"
-  s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
-  rollback_openssl_lib
-}
-if [ "${OUT_TYPE}" == "s3" ]; then
-  on_exit out_cleanup
-fi
-
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+# JOB_OUTPUT_PATH is the location where the job writes its data to
+JOB_OUTPUT_PATH="${OUTPUT_PATH}"
 
 ###################################
 # Get all lines in part files and sort them numerically.
@@ -79,9 +42,6 @@ 
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.
 #   sorted content of part files
 ###################################
 function get_complete_result {
-  if [ "${OUT_TYPE}" == "s3" ]; then
-    s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
-  fi
   find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
 }
 
@@ -89,20 +49,67 @@ function get_complete_result {
 # Get total number of lines in part files.
 #
 # Globals:
-#   S3_PREFIX
+#   OUTPUT_PATH
 # Arguments:
 #   None
 # Returns:
 #   line number in part files
 ###################################
 function get_total_number_of_valid_lines {
-  if [ "${OUT_TYPE}" == "local" ]; then
-    get_complete_result | wc -l | tr -d '[:space:]'
-  elif [ "${OUT_TYPE}" == "s3" ]; then
-    s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
-  fi
+  get_complete_result | wc -l | tr -d '[:space:]'
 }
 
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "[INFO] Test run in local environment: No S3 environment is not loaded."
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  # the s3 context requires additional
+  source "$(dirname "$0")"/common_s3.sh
+  s3_setup hadoop
+
+  # overwrites JOB_OUTPUT_PATH to point to S3
+  S3_DATA_PREFIX="${RANDOM_PREFIX}"
+  S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
+  JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
+  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"
+
+  # overwrites implementation for local runs
+  function get_complete_result {
+    # copies the data from S3 to the local OUTPUT_PATH
+    s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" 
"$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true
+
+    # and prints the sorted output
+    find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+  }
+
+  # overwrites implementation for local runs
+  function get_total_number_of_valid_lines {
+    s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-"
+  }
+
+  # make sure we delete the file at the end
+  function out_cleanup {
+    s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
+    s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
+    rollback_openssl_lib
+  }
+
+  on_exit out_cleanup
+else
+  echo "[ERROR] Unknown out type: ${OUT_TYPE}"
+  exit 1
+fi
+
+# randomly set up openSSL with dynamically/statically linked libraries
+OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo 
"static"; fi)
+echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
+
+set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
+set_config_key "metrics.fetcher.update-interval" "2000"
+# this test relies on global failovers
+set_config_key "jobmanager.execution.failover-strategy" "full"
+
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+
 ###################################
 # Waits until a number of values have been written within a timeout.
 # If the timeout expires, exit with return code 1.

Reply via email to