This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new a4dd58545d5 Revert "[FLINK-34324][test] Makes all s3 related 
operations being declared and called in a single location"
a4dd58545d5 is described below

commit a4dd58545d59b59089d9321a743d6c98a7c8e855
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Tue Feb 6 17:32:22 2024 +0100

    Revert "[FLINK-34324][test] Makes all s3 related operations being declared 
and called in a single location"
    
    This reverts commit a15515ebc0e4c59ea0642e745e942591c28b3a3c.
---
 .../test-scripts/test_file_sink.sh                 | 111 ++++++++++-----------
 1 file changed, 52 insertions(+), 59 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh 
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 5ed1fda2c68..711f74b6672 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -20,16 +20,53 @@
 OUT_TYPE="${1:-local}" # other type: s3
 SINK_TO_TEST="${2:-"StreamingFileSink"}"
 
+S3_PREFIX=temp/test_file_sink-$(uuidgen)
+OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
+S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
 source "$(dirname "$0")"/common.sh
 
-# OUTPUT_PATH is a local folder that can be used as a download folder for 
remote data
-# the helper functions will access this folder
-RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
-OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
+if [ "${OUT_TYPE}" == "s3" ]; then
+  source "$(dirname "$0")"/common_s3.sh
+else
+  echo "S3 environment is not loaded for non-s3 test runs (test run type: 
$OUT_TYPE)."
+fi
+
+# randomly set up openSSL with dynamically/statically linked libraries
+OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo 
"static"; fi)
+echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
+
+s3_setup hadoop
+set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
+set_config_key "metrics.fetcher.update-interval" "2000"
+# this test relies on global failovers
+set_config_key "jobmanager.execution.failover-strategy" "full"
+
 mkdir -p $OUTPUT_PATH
 
-# JOB_OUTPUT_PATH is the location where the job writes its data to
-JOB_OUTPUT_PATH="${OUTPUT_PATH}"
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "Use local output"
+  JOB_OUTPUT_PATH=${OUTPUT_PATH}
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  echo "Use s3 output"
+  JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
+  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
+  mkdir -p "$OUTPUT_PATH-chk"
+else
+  echo "Unknown output type: ${OUT_TYPE}"
+  exit 1
+fi
+
+# make sure we delete the file at the end
+function out_cleanup {
+  s3_delete_by_full_path_prefix "$S3_PREFIX"
+  s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
+  rollback_openssl_lib
+}
+if [ "${OUT_TYPE}" == "s3" ]; then
+  on_exit out_cleanup
+fi
+
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
 
 ###################################
 # Get all lines in part files and sort them numerically.
@@ -42,6 +79,9 @@ JOB_OUTPUT_PATH="${OUTPUT_PATH}"
 #   sorted content of part files
 ###################################
 function get_complete_result {
+  if [ "${OUT_TYPE}" == "s3" ]; then
+    s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" 
"part-" true
+  fi
   find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
 }
 
@@ -49,67 +89,20 @@ function get_complete_result {
 # Get total number of lines in part files.
 #
 # Globals:
-#   OUTPUT_PATH
+#   S3_PREFIX
 # Arguments:
 #   None
 # Returns:
 #   line number in part files
 ###################################
 function get_total_number_of_valid_lines {
-  get_complete_result | wc -l | tr -d '[:space:]'
+  if [ "${OUT_TYPE}" == "local" ]; then
+    get_complete_result | wc -l | tr -d '[:space:]'
+  elif [ "${OUT_TYPE}" == "s3" ]; then
+    s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
+  fi
 }
 
-if [ "${OUT_TYPE}" == "local" ]; then
-  echo "[INFO] Test run in local environment: No S3 environment is not loaded."
-elif [ "${OUT_TYPE}" == "s3" ]; then
-  # the s3 context requires additional
-  source "$(dirname "$0")"/common_s3.sh
-  s3_setup hadoop
-
-  # overwrites JOB_OUTPUT_PATH to point to S3
-  S3_DATA_PREFIX="${RANDOM_PREFIX}"
-  S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
-  JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
-  set_config_key "state.checkpoints.dir" 
"s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"
-
-  # overwrites implementation for local runs
-  function get_complete_result {
-    # copies the data from S3 to the local OUTPUT_PATH
-    s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" 
"$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true
-
-    # and prints the sorted output
-    find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
-  }
-
-  # overwrites implementation for local runs
-  function get_total_number_of_valid_lines {
-    s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-"
-  }
-
-  # make sure we delete the file at the end
-  function out_cleanup {
-    s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
-    s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
-    rollback_openssl_lib
-  }
-
-  on_exit out_cleanup
-else
-  echo "[ERROR] Unknown out type: ${OUT_TYPE}"
-  exit 1
-fi
-
-# randomly set up openSSL with dynamically/statically linked libraries
-OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo 
"static"; fi)
-echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection 
between 'dynamic' and 'static')"
-
-set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
-set_config_key "metrics.fetcher.update-interval" "2000"
-# this test relies on global failovers
-set_config_key "jobmanager.execution.failover-strategy" "full"
-
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
-
 ###################################
 # Waits until a number of values have been written within a timeout.
 # If the timeout expires, exit with return code 1.

Reply via email to