This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch pull/19923/head in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd543cb80793e8f347faf73ea8db8406e6700dbc Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Wed Sep 14 18:28:40 2022 +0200 remove bash test logic (but keep packaging tests) --- .../test-scripts/test_sql_client.sh | 203 --------------------- 1 file changed, 203 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index ea6a343589b..a5ca0511e80 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -19,25 +19,6 @@ set -Eeuo pipefail -KAFKA_VERSION="3.2.1" -CONFLUENT_VERSION="6.2.2" -CONFLUENT_MAJOR_VERSION="6.2" -# Check the Confluent Platform <> Apache Kafka compatibility matrix when updating KAFKA_VERSION -KAFKA_SQL_VERSION="universal" -ELASTICSEARCH_VERSION=7 -# we use the smallest version possible -ELASTICSEARCH_MAC_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.2-darwin-x86_64.tar.gz' -ELASTICSEARCH_LINUX_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.2-linux-x86_64.tar.gz' - -source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/kafka_sql_common.sh \ - $KAFKA_VERSION \ - $CONFLUENT_VERSION \ - $CONFLUENT_MAJOR_VERSION \ - $KAFKA_SQL_VERSION -source "$(dirname "$0")"/elasticsearch-common.sh - -SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar SQL_JARS_DIR=$END_TO_END_DIR/flink-sql-client-test/target/sql-jars ################################################################################ @@ -80,187 +61,3 @@ for SQL_JAR in $SQL_JARS_DIR/*.jar; do done rm -r $EXTRACTED_JAR - -################################################################################ -# Prepare connectors -################################################################################ - -ELASTICSEARCH_INDEX='my_users' - -function sql_cleanup() { - stop_kafka_cluster - shutdown_elasticsearch_cluster "$ELASTICSEARCH_INDEX" -} -on_exit sql_cleanup - -function prepare_elasticsearch { - echo "Preparing Elasticsearch (version=$ELASTICSEARCH_VERSION)..." - # elastcisearch offers different release binary file for corresponding system since version 7. - case "$(uname -s)" in - Linux*) OS_TYPE=linux;; - Darwin*) OS_TYPE=mac;; - *) OS_TYPE="UNKNOWN:${unameOut}" - esac - - if [[ "$OS_TYPE" == "mac" ]]; then - DOWNLOAD_URL=$ELASTICSEARCH_MAC_DOWNLOAD_URL - elif [[ "$OS_TYPE" == "linux" ]]; then - DOWNLOAD_URL=$ELASTICSEARCH_LINUX_DOWNLOAD_URL - else - echo "[ERROR] Unsupported OS for Elasticsearch: $OS_TYPE" - exit 1 - fi - - setup_elasticsearch $DOWNLOAD_URL $ELASTICSEARCH_VERSION - wait_elasticsearch_working -} - -# prepare Kafka -echo "Preparing Kafka..." - -setup_kafka_dist - -start_kafka_cluster - -create_kafka_json_source test-json -create_kafka_topic 1 1 test-avro - -# prepare Elasticsearch -prepare_elasticsearch - -################################################################################ -# Prepare Flink -################################################################################ - -echo "Preparing Flink..." - -start_cluster -start_taskmanagers 2 - -################################################################################ -# Run SQL statements -################################################################################ - -echo "Testing SQL statements..." - -KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka" ) -ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch$ELASTICSEARCH_VERSION" ) - -# create session environment file -RESULT=$TEST_DATA_DIR/result -INIT_SQL=$TEST_DATA_DIR/sql-client-init.sql - -get_kafka_json_source_schema test-json JsonSourceTable >> $INIT_SQL - -cat >> $INIT_SQL << EOF - - CREATE TABLE ElasticsearchUpsertSinkTable ( - user_id INT, - user_name STRING, - user_count BIGINT, - PRIMARY KEY (user_id) NOT ENFORCED - ) WITH ( - 'connector' = 'elasticsearch-$ELASTICSEARCH_VERSION', - 'hosts' = 'http://localhost:9200', - 'index' = '$ELASTICSEARCH_INDEX', - 'sink.bulk-flush.max-actions' = '1', - 'format' = 'json' - ); - - CREATE TABLE ElasticsearchAppendSinkTable ( - user_id INT, - user_name STRING, - user_count BIGINT - ) WITH ( - 'connector' = 'elasticsearch-$ELASTICSEARCH_VERSION', - 'hosts' = 'http://localhost:9200', - 'index' = '$ELASTICSEARCH_INDEX', - 'sink.bulk-flush.max-actions' = '1', - 'format' = 'json' - ); - - CREATE FUNCTION RegReplace AS 'org.apache.flink.table.toolbox.StringRegexReplaceFunction'; -EOF - -# submit SQL statements - -echo "Executing SQL: Values -> Elasticsearch (upsert)" - -SQL_STATEMENT_1=$(cat << EOF -INSERT INTO ElasticsearchUpsertSinkTable - SELECT user_id, user_name, COUNT(*) AS user_count - FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob')) - AS UserCountTable(user_id, user_name) - GROUP BY user_id, user_name -EOF -) - -JOB_ID=$($FLINK_DIR/bin/sql-client.sh \ - --jar $KAFKA_SQL_JAR \ - --jar $ELASTICSEARCH_SQL_JAR \ - --jar $SQL_TOOLBOX_JAR \ - --init $INIT_SQL \ - --update "$SQL_STATEMENT_1" | grep "Job ID:" | sed 's/.* //g') - -wait_job_terminal_state "$JOB_ID" "FINISHED" - -verify_result_line_number 3 "$ELASTICSEARCH_INDEX" - -echo "Executing SQL: Values -> Elasticsearch (append, no key)" - -SQL_STATEMENT_2=$(cat << EOF -INSERT INTO ElasticsearchAppendSinkTable - SELECT * - FROM ( - VALUES - (1, 'Bob', CAST(0 AS BIGINT)), - (22, 'Tom', CAST(0 AS BIGINT)), - (42, 'Kim', CAST(0 AS BIGINT)), - (42, 'Kim', CAST(0 AS BIGINT)), - (42, 'Kim', CAST(0 AS BIGINT)), - (1, 'Bob', CAST(0 AS BIGINT))) - AS UserCountTable(user_id, user_name, user_count) -EOF -) - -JOB_ID=$($FLINK_DIR/bin/sql-client.sh \ - --jar $KAFKA_SQL_JAR \ - --jar $ELASTICSEARCH_SQL_JAR \ - --jar $SQL_TOOLBOX_JAR \ - --init $INIT_SQL \ - --update "$SQL_STATEMENT_2" | grep "Job ID:" | sed 's/.* //g') - -wait_job_terminal_state "$JOB_ID" "FINISHED" - -# 3 upsert results and 6 append results -verify_result_line_number 9 "$ELASTICSEARCH_INDEX" - -echo "Executing SQL: Match recognize -> Elasticsearch" - -SQL_STATEMENT_3=$(cat << EOF -INSERT INTO ElasticsearchAppendSinkTable - SELECT 1 as user_id, T.userName as user_name, cast(1 as BIGINT) as user_count - FROM ( - SELECT \`user\`, \`rowtime\` - FROM JsonSourceTable - WHERE \`user\` IS NOT NULL) - MATCH_RECOGNIZE ( - ORDER BY rowtime - MEASURES - \`user\` as userName - PATTERN (A) - DEFINE - A as \`user\` = 'Alice' - ) T -EOF -) - -JOB_ID=$($FLINK_DIR/bin/sql-client.sh \ - --jar $KAFKA_SQL_JAR \ - --jar $ELASTICSEARCH_SQL_JAR \ - --jar $SQL_TOOLBOX_JAR \ - --init $INIT_SQL \ - --update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g') - -# 3 upsert results and 6 append results and 3 match_recognize results -verify_result_line_number 12 "$ELASTICSEARCH_INDEX"