This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new becee2f  [FLINK-13441][e2e] Add e2e test for SQL batch job
becee2f is described below

commit becee2f5db078231d32d207a2a2e708ecaaaedc1
Author: Aleksey Pak <alek...@ververica.com>
AuthorDate: Tue Jul 30 19:33:04 2019 +0200

    [FLINK-13441][e2e] Add e2e test for SQL batch job
    
    This closes #9359.
---
 .../flink-batch-sql-test/pom.xml                   |  73 ++++++++++
 .../flink/sql/tests/BatchSQLTestProgram.java       | 160 +++++++++++++++++++++
 flink-end-to-end-tests/pom.xml                     |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh        |   1 +
 .../test-scripts/test_batch_sql.sh                 |  88 ++++++++++++
 tools/travis/splits/split_misc.sh                  |   1 +
 tools/travis/splits/split_misc_hadoopfree.sh       |   1 +
 7 files changed, 325 insertions(+)

diff --git a/flink-end-to-end-tests/flink-batch-sql-test/pom.xml 
b/flink-end-to-end-tests/flink-batch-sql-test/pom.xml
new file mode 100644
index 0000000..fe5eadd
--- /dev/null
+++ b/flink-end-to-end-tests/flink-batch-sql-test/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+    -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.10-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-batch-sql-test_${scala.binary.version}</artifactId>
+       <name>flink-batch-sql-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>BatchSQLTestProgram</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>BatchSQLTestProgram</finalName>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.sql.tests.BatchSQLTestProgram</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
 
b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
new file mode 100644
index 0000000..615b6d6
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.tests;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.io.IteratorInputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sources.InputFormatTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * End-to-end test for batch SQL queries.
+ *
+ * <p>The sources are generated and bounded. The result is always constant.
+ *
+ * <p>Parameters:
+ * -outputPath output file path for CsvTableSink;
+ * -sqlStatement SQL statement that will be executed as sqlUpdate
+ */
+public class BatchSQLTestProgram {
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool params = ParameterTool.fromArgs(args);
+               String outputPath = params.getRequired("outputPath");
+               String sqlStatement = params.getRequired("sqlStatement");
+
+               TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inBatchMode()
+                       .build());
+
+               tEnv.registerTableSource("table1", new GeneratorTableSource(10, 
100, 60, 0));
+               tEnv.registerTableSource("table2", new GeneratorTableSource(5, 
0.2f, 60, 5));
+               tEnv.registerTableSink("sinkTable",
+                       new CsvTableSink(outputPath)
+                               .configure(new String[]{"f0", "f1"}, new 
TypeInformation[]{Types.INT, Types.SQL_TIMESTAMP}));
+
+               tEnv.sqlUpdate(sqlStatement);
+               tEnv.execute("TestSqlJob");
+       }
+
+       /**
+        * TableSource for generated data.
+        */
+       public static class GeneratorTableSource extends 
InputFormatTableSource<Row> {
+
+               private final int numKeys;
+               private final float recordsPerKeyAndSecond;
+               private final int durationSeconds;
+               private final int offsetSeconds;
+
+               GeneratorTableSource(int numKeys, float recordsPerKeyAndSecond, 
int durationSeconds, int offsetSeconds) {
+                       this.numKeys = numKeys;
+                       this.recordsPerKeyAndSecond = recordsPerKeyAndSecond;
+                       this.durationSeconds = durationSeconds;
+                       this.offsetSeconds = offsetSeconds;
+               }
+
+               @Override
+               public InputFormat<Row, ?> getInputFormat() {
+                       return new IteratorInputFormat<>(
+                               DataGenerator.create(numKeys, 
recordsPerKeyAndSecond, durationSeconds, offsetSeconds));
+               }
+
+               @Override
+               public DataType getProducedDataType() {
+                       return getTableSchema().toRowDataType();
+               }
+
+               @Override
+               public TableSchema getTableSchema() {
+                       return TableSchema.builder()
+                               .field("key", DataTypes.INT())
+                               .field("rowtime", DataTypes.TIMESTAMP(3))
+                               .field("payload", DataTypes.STRING())
+                               .build();
+               }
+       }
+
+       /**
+        * Iterator for generated data.
+        */
+       public static class DataGenerator implements Iterator<Row>, 
Serializable {
+               private static final long serialVersionUID = 1L;
+
+               final int numKeys;
+
+               private int keyIndex = 0;
+
+               private final long durationMs;
+               private final long stepMs;
+               private final long offsetMs;
+               private long ms = 0;
+
+               static DataGenerator create(int numKeys, float 
rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) {
+                       int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
+                       return new DataGenerator(numKeys, durationSeconds * 
1000, sleepMs, offsetSeconds * 2000L);
+               }
+
+               DataGenerator(int numKeys, long durationMs, long stepMs, long 
offsetMs) {
+                       this.numKeys = numKeys;
+                       this.durationMs = durationMs;
+                       this.stepMs = stepMs;
+                       this.offsetMs = offsetMs;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return ms < durationMs;
+               }
+
+               @Override
+               public Row next() {
+                       if (!hasNext()) {
+                               throw new NoSuchElementException();
+                       }
+                       Row row = Row.of(
+                               keyIndex,
+                               LocalDateTime.ofInstant(Instant.ofEpochMilli(ms 
+ offsetMs), ZoneOffset.UTC),
+                               "Some payload...");
+                       ++keyIndex;
+                       if (keyIndex >= numKeys) {
+                               keyIndex = 0;
+                               ms += stepMs;
+                       }
+                       return row;
+               }
+       }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 85af090..4ae2c2d 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -41,6 +41,7 @@ under the License.
                <module>flink-dataset-allround-test</module>
                <module>flink-dataset-fine-grained-recovery-test</module>
                <module>flink-datastream-allround-test</module>
+               <module>flink-batch-sql-test</module>
                <module>flink-stream-sql-test</module>
                <module>flink-bucketing-sink-test</module>
                <module>flink-distributed-cache-via-blob-test</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 1f3b445..3b34b560 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -123,6 +123,7 @@ run_test "Queryable state (rocksdb) end-to-end test" 
"$END_TO_END_DIR/test-scrip
 run_test "Queryable state (rocksdb) with TM restart end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" 
"skip_check_exceptions"
 
 run_test "DataSet allround end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
+run_test "Batch SQL end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
 run_test "Streaming SQL end-to-end test (Old planner)" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
 run_test "Streaming SQL end-to-end test (Blink planner)" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" 
"skip_check_exceptions"
 run_test "Streaming bucketing end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" 
"skip_check_exceptions"
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh 
b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
new file mode 100755
index 0000000..dc9b931
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
@@ -0,0 +1,88 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Test for SQL (batch mode) job that runs successfully on a Flink cluster with 
fewer slots (1) than job's total slots (9).
+set -Eeuo pipefail
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-batch-sql-test/target/BatchSQLTestProgram.jar
+
+OUTPUT_FILE_PATH="${TEST_DATA_DIR}/out/result/results.csv"
+
+function sqlJobQuery() {
+    local tumbleWindowSizeSeconds=10
+
+    overQuery=$(cat <<SQL
+SELECT key, rowtime, 42 AS cnt FROM table1
+SQL
+)
+
+    tumbleQuery=$(cat <<SQL
+SELECT
+    key,
+    CASE SUM(cnt) / COUNT(*) WHEN 101 THEN 1 WHEN -1 THEN NULL ELSE 99 END AS 
correct,
+    TUMBLE_START(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND) AS 
wStart,
+    TUMBLE_ROWTIME(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND) AS 
rowtime
+FROM (${overQuery})
+WHERE rowtime > TIMESTAMP '1970-01-01 00:00:01'
+GROUP BY key, TUMBLE(rowtime, INTERVAL '${tumbleWindowSizeSeconds}' SECOND)
+SQL
+)
+
+    joinQuery=$(cat <<SQL
+SELECT
+    t1.key,
+    t2.rowtime AS rowtime,
+    t2.correct,
+    t2.wStart
+FROM table2 t1, (${tumbleQuery}) t2
+WHERE
+    t1.key = t2.key AND
+    t1.rowtime BETWEEN t2.rowtime AND t2.rowtime + INTERVAL 
'${tumbleWindowSizeSeconds}' SECOND
+SQL
+)
+
+    echo "
+SELECT
+    SUM(correct) AS correct,
+    TUMBLE_START(rowtime, INTERVAL '20' SECOND) AS rowtime
+FROM (${joinQuery})
+GROUP BY TUMBLE(rowtime, INTERVAL '20' SECOND)"
+}
+
+set_config_key "taskmanager.numberOfTaskSlots" "1"
+
+function sql_cleanup() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+}
+on_exit sql_cleanup
+
+start_cluster
+
+# The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots
+$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath 
"file://${OUTPUT_FILE_PATH}" -sqlStatement \
+    "INSERT INTO sinkTable $(sqlJobQuery)"
+
+# check result:
+#1980,1970-01-01 00:00:00.0
+#1980,1970-01-01 00:00:20.0
+#1980,1970-01-01 00:00:40.0
+check_result_hash "BatchSQL" "${OUTPUT_FILE_PATH}" 
"c7ccd2c3a25c3e06616806cf6aecaa66"
diff --git a/tools/travis/splits/split_misc.sh 
b/tools/travis/splits/split_misc.sh
index 9bc5a27..97f811d 100755
--- a/tools/travis/splits/split_misc.sh
+++ b/tools/travis/splits/split_misc.sh
@@ -49,6 +49,7 @@ run_test "Queryable state (rocksdb) end-to-end test" 
"$END_TO_END_DIR/test-scrip
 run_test "Queryable state (rocksdb) with TM restart end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" 
"skip_check_exceptions"
 
 run_test "DataSet allround end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
+run_test "Batch SQL end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
 run_test "Streaming SQL end-to-end test (Old planner)" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
 run_test "Streaming SQL end-to-end test (Blink planner)" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" 
"skip_check_exceptions"
 run_test "Streaming bucketing end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" 
"skip_check_exceptions"
diff --git a/tools/travis/splits/split_misc_hadoopfree.sh 
b/tools/travis/splits/split_misc_hadoopfree.sh
index a2914ab..193ad0b 100755
--- a/tools/travis/splits/split_misc_hadoopfree.sh
+++ b/tools/travis/splits/split_misc_hadoopfree.sh
@@ -49,6 +49,7 @@ run_test "Queryable state (rocksdb) end-to-end test" 
"$END_TO_END_DIR/test-scrip
 run_test "Queryable state (rocksdb) with TM restart end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" 
"skip_check_exceptions"
 
 run_test "DataSet allround end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
+run_test "Batch SQL end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
 run_test "Streaming SQL end-to-end test (Old planner)" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
 run_test "Streaming SQL end-to-end test (Blink planner)" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink" 
"skip_check_exceptions"
 run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" 
"skip_check_exceptions"

Reply via email to