This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ae56cd7 [FLINK-13441][e2e] Add e2e test for SQL batch job ae56cd7 is described below commit ae56cd7ab4d2f4811bddd1f4c7a40e5e97e01848 Author: Aleksey Pak <alek...@ververica.com> AuthorDate: Tue Jul 30 19:33:04 2019 +0200 [FLINK-13441][e2e] Add e2e test for SQL batch job This closes #9359. --- .../flink-batch-sql-test/pom.xml | 73 ++++++++++ .../flink/sql/tests/BatchSQLTestProgram.java | 160 +++++++++++++++++++++ flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh | 1 + .../test-scripts/test_batch_sql.sh | 88 ++++++++++++ tools/travis/splits/split_misc.sh | 1 + tools/travis/splits/split_misc_hadoopfree.sh | 1 + 7 files changed, 325 insertions(+) diff --git a/flink-end-to-end-tests/flink-batch-sql-test/pom.xml b/flink-end-to-end-tests/flink-batch-sql-test/pom.xml new file mode 100644 index 0000000..fe5eadd --- /dev/null +++ b/flink-end-to-end-tests/flink-batch-sql-test/pom.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.10-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-batch-sql-test_${scala.binary.version}</artifactId> + <name>flink-batch-sql-test</name> + <packaging>jar</packaging> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>BatchSQLTestProgram</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>BatchSQLTestProgram</finalName> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.sql.tests.BatchSQLTestProgram</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java new file mode 100644 index 0000000..615b6d6 --- /dev/null +++ b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.io.IteratorInputFormat; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.CsvTableSink; +import org.apache.flink.table.sources.InputFormatTableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * End-to-end test for batch SQL queries. + * + * <p>The sources are generated and bounded. The result is always constant. + * + * <p>Parameters: + * -outputPath output file path for CsvTableSink; + * -sqlStatement SQL statement that will be executed as sqlUpdate + */ +public class BatchSQLTestProgram { + + public static void main(String[] args) throws Exception { + ParameterTool params = ParameterTool.fromArgs(args); + String outputPath = params.getRequired("outputPath"); + String sqlStatement = params.getRequired("sqlStatement"); + + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inBatchMode() + .build()); + + tEnv.registerTableSource("table1", new GeneratorTableSource(10, 100, 60, 0)); + tEnv.registerTableSource("table2", new GeneratorTableSource(5, 0.2f, 60, 5)); + tEnv.registerTableSink("sinkTable", + new CsvTableSink(outputPath) + .configure(new String[]{"f0", "f1"}, new TypeInformation[]{Types.INT, Types.SQL_TIMESTAMP})); + + tEnv.sqlUpdate(sqlStatement); + tEnv.execute("TestSqlJob"); + } + + /** + * TableSource for generated data. + */ + public static class GeneratorTableSource extends InputFormatTableSource<Row> { + + private final int numKeys; + private final float recordsPerKeyAndSecond; + private final int durationSeconds; + private final int offsetSeconds; + + GeneratorTableSource(int numKeys, float recordsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { + this.numKeys = numKeys; + this.recordsPerKeyAndSecond = recordsPerKeyAndSecond; + this.durationSeconds = durationSeconds; + this.offsetSeconds = offsetSeconds; + } + + @Override + public InputFormat<Row, ?> getInputFormat() { + return new IteratorInputFormat<>( + DataGenerator.create(numKeys, recordsPerKeyAndSecond, durationSeconds, offsetSeconds)); + } + + @Override + public DataType getProducedDataType() { + return getTableSchema().toRowDataType(); + } + + @Override + public TableSchema getTableSchema() { + return TableSchema.builder() + .field("key", DataTypes.INT()) + .field("rowtime", DataTypes.TIMESTAMP(3)) + .field("payload", DataTypes.STRING()) + .build(); + } + } + + /** + * Iterator for generated data. + */ + public static class DataGenerator implements Iterator<Row>, Serializable { + private static final long serialVersionUID = 1L; + + final int numKeys; + + private int keyIndex = 0; + + private final long durationMs; + private final long stepMs; + private final long offsetMs; + private long ms = 0; + + static DataGenerator create(int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { + int sleepMs = (int) (1000 / rowsPerKeyAndSecond); + return new DataGenerator(numKeys, durationSeconds * 1000, sleepMs, offsetSeconds * 2000L); + } + + DataGenerator(int numKeys, long durationMs, long stepMs, long offsetMs) { + this.numKeys = numKeys; + this.durationMs = durationMs; + this.stepMs = stepMs; + this.offsetMs = offsetMs; + } + + @Override + public boolean hasNext() { + return ms < durationMs; + } + + @Override + public Row next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Row row = Row.of( + keyIndex, + LocalDateTime.ofInstant(Instant.ofEpochMilli(ms + offsetMs), ZoneOffset.UTC), + "Some payload..."); + ++keyIndex; + if (keyIndex >= numKeys) { + keyIndex = 0; + ms += stepMs; + } + return row; + } + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 85af090..4ae2c2d 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -41,6 +41,7 @@ under the License. <module>flink-dataset-allround-test</module> <module>flink-dataset-fine-grained-recovery-test</module> <module>flink-datastream-allround-test</module> + <module>flink-batch-sql-test</module> <module>flink-stream-sql-test</module> <module>flink-bucketing-sink-test</module> <module>flink-distributed-cache-via-blob-test</module> diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 1f3b445..3b34b560 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -123,6 +123,7 @@ run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scrip run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions" run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh" +run_test "Batch SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh" run_test "Streaming SQL end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions" run_test "Streaming SQL end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" "skip_check_exceptions" run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" "skip_check_exceptions" diff --git a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh new file mode 100755 index 0000000..dc9b931 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Test for SQL (batch mode) job that runs successfully on a Flink cluster with fewer slots (1) than job's total slots (9). +set -Eeuo pipefail + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-batch-sql-test/target/BatchSQLTestProgram.jar + +OUTPUT_FILE_PATH="${TEST_DATA_DIR}/out/result/results.csv" + +function sqlJobQuery() { + local tumbleWindowSizeSeconds=10 + + overQuery=$(cat <<SQL +SELECT key, rowtime, 42 AS cnt FROM table1 +SQL +) + + tumbleQuery=$(cat <<SQL +SELECT + key, + CASE SUM(cnt) / COUNT(*) WHEN 101 THEN 1 WHEN -1 THEN NULL ELSE 99 END AS correct, + TUMBLE_START(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND) AS wStart, + TUMBLE_ROWTIME(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND) AS rowtime +FROM (${overQuery}) +WHERE rowtime > TIMESTAMP '1970-01-01 00:00:01' +GROUP BY key, TUMBLE(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND) +SQL +) + + joinQuery=$(cat <<SQL +SELECT + t1.key, + t2.rowtime AS rowtime, + t2.correct, + t2.wStart +FROM table2 t1, (${tumbleQuery}) t2 +WHERE + t1.key = t2.key AND + t1.rowtime BETWEEN t2.rowtime AND t2.rowtime + INTERVAL '${tumbleWindowSizeSeconds}' SECOND +SQL +) + + echo " +SELECT + SUM(correct) AS correct, + TUMBLE_START(rowtime, INTERVAL '20' SECOND) AS rowtime +FROM (${joinQuery}) +GROUP BY TUMBLE(rowtime, INTERVAL '20' SECOND)" +} + +set_config_key "taskmanager.numberOfTaskSlots" "1" + +function sql_cleanup() { + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all +} +on_exit sql_cleanup + +start_cluster + +# The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots +$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "file://${OUTPUT_FILE_PATH}" -sqlStatement \ + "INSERT INTO sinkTable $(sqlJobQuery)" + +# check result: +#1980,1970-01-01 00:00:00.0 +#1980,1970-01-01 00:00:20.0 +#1980,1970-01-01 00:00:40.0 +check_result_hash "BatchSQL" "${OUTPUT_FILE_PATH}" "c7ccd2c3a25c3e06616806cf6aecaa66" diff --git a/tools/travis/splits/split_misc.sh b/tools/travis/splits/split_misc.sh index 9bc5a27..97f811d 100755 --- a/tools/travis/splits/split_misc.sh +++ b/tools/travis/splits/split_misc.sh @@ -49,6 +49,7 @@ run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scrip run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions" run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh" +run_test "Batch SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh" run_test "Streaming SQL end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions" run_test "Streaming SQL end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" "skip_check_exceptions" run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" "skip_check_exceptions" diff --git a/tools/travis/splits/split_misc_hadoopfree.sh b/tools/travis/splits/split_misc_hadoopfree.sh index a2914ab..193ad0b 100755 --- a/tools/travis/splits/split_misc_hadoopfree.sh +++ b/tools/travis/splits/split_misc_hadoopfree.sh @@ -49,6 +49,7 @@ run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scrip run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions" run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh" +run_test "Batch SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh" run_test "Streaming SQL end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions" run_test "Streaming SQL end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" "skip_check_exceptions" run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" "skip_check_exceptions"