Repository: flink Updated Branches: refs/heads/release-1.5 871dccbdf -> c5fa87f69
[FLINK-9067] [e2eTests] Fix test and simplify code This closes #5759. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5fa87f6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5fa87f6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5fa87f6 Branch: refs/heads/release-1.5 Commit: c5fa87f6923bb81320affd8a0c27250d8ad92b26 Parents: 11ca9fa Author: Timo Walther <twal...@apache.org> Authored: Tue Mar 27 14:20:47 2018 +0200 Committer: Timo Walther <twal...@apache.org> Committed: Tue Mar 27 15:57:55 2018 +0200 ---------------------------------------------------------------------- .../flink-stream-sql-test/pom.xml | 21 +---------------- .../flink/sql/tests/StreamSQLTestProgram.java | 12 +++++----- .../test-scripts/test_streaming_sql.sh | 24 +++++++++++++++----- 3 files changed, 25 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c5fa87f6/flink-end-to-end-tests/flink-stream-sql-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-sql-test/pom.xml b/flink-end-to-end-tests/flink-stream-sql-test/pom.xml index 860432c..1b8e4f7 100644 --- a/flink-end-to-end-tests/flink-stream-sql-test/pom.xml +++ b/flink-end-to-end-tests/flink-stream-sql-test/pom.xml @@ -73,6 +73,7 @@ <goal>shade</goal> </goals> <configuration> + <finalName>StreamSQLTestProgram</finalName> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> @@ -99,26 +100,6 @@ </execution> </executions> </plugin> - <!--simplify the name of the testing JARs for referring to them in the end-to-end test scripts--> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <id>rename</id> - <phase>package</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <target> - <copy file="${project.basedir}/target/flink-stream-sql-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/StreamSQLTestProgram.jar" /> - </target> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/c5fa87f6/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java index 91fb4c1..e9e79ac 100644 --- a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java @@ -58,7 +58,7 @@ import java.util.Map; * * <p>The stream is bounded and will complete after about a minute. * The result is always constant. - * The job is killed on the first attemped and restarted. + * The job is killed on the first attempt and restarted. * * <p>Parameters: * -outputPath Sets the path to where the result data is written. @@ -249,12 +249,12 @@ public class StreamSQLTestProgram { } @Override - public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + public List<Long> snapshotState(long checkpointId, long timestamp) { return Collections.singletonList(ms); } @Override - public void restoreState(List<Long> state) throws Exception { + public void restoreState(List<Long> state) { for (Long l : state) { ms += l; } @@ -272,7 +272,7 @@ public class StreamSQLTestProgram { private int lostRecordCnt = 0; @Override - public Row map(Row value) throws Exception { + public Row map(Row value) { // the both counts are the same only in the first execution attempt if (saveRecordCnt == 1 && lostRecordCnt == 1) { @@ -294,12 +294,12 @@ public class StreamSQLTestProgram { } @Override - public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + public List<Integer> snapshotState(long checkpointId, long timestamp) { return Collections.singletonList(saveRecordCnt); } @Override - public void restoreState(List<Integer> state) throws Exception { + public void restoreState(List<Integer> state) { for (Integer i : state) { saveRecordCnt += i; } http://git-wip-us.apache.org/repos/asf/flink/blob/c5fa87f6/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh b/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh index ed7b16c..21c64a1 100755 --- a/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh +++ b/flink-end-to-end-tests/test-scripts/test_streaming_sql.sh @@ -31,13 +31,25 @@ $FLINK_DIR/bin/taskmanager.sh start $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR -outputPath $TEST_DATA_DIR/out/result -stop_cluster -$FLINK_DIR/bin/taskmanager.sh stop-all +function sql_cleanup() { -# remove flink-table from lib folder -rm $FLINK_DIR/lib/flink-table*jar + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + # remove flink-table from lib folder + rm $FLINK_DIR/lib/flink-table*jar + + # make sure to run regular cleanup as well + cleanup +} +trap sql_cleanup INT +trap sql_cleanup EXIT # collect results from files -cat /tmp/xxx/part-0-0 /tmp/xxx/_part-0-1.pending > $TEST_DATA_DIR/out/result-complete -# check result +cat $TEST_DATA_DIR/out/result/part-0-0 $TEST_DATA_DIR/out/result/_part-0-1.pending > $TEST_DATA_DIR/out/result-complete + +# check result: +# 20,1970-01-01 00:00:00.0 +# 20,1970-01-01 00:00:20.0 +# 20,1970-01-01 00:00:40.0 check_result_hash "StreamSQL" $TEST_DATA_DIR/out/result-complete "b29f14ed221a936211202ff65b51ee26"