Repository: flink Updated Branches: refs/heads/release-1.5 5621a1e9f -> 3df5c6685
[FLINK-8980] [e2e] Add a BucketingSink end-to-end test This closes #5813. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3df5c668 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3df5c668 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3df5c668 Branch: refs/heads/release-1.5 Commit: 3df5c6685091cea6c50758769c37ad3d6390f94a Parents: 5621a1e Author: Timo Walther <twal...@apache.org> Authored: Tue Mar 27 17:44:58 2018 +0200 Committer: Timo Walther <twal...@apache.org> Committed: Thu Apr 19 15:12:08 2018 +0200 ---------------------------------------------------------------------- .../flink-bucketing-sink-test/pom.xml | 112 ++++++++++++ .../tests/BucketingSinkTestProgram.java | 181 +++++++++++++++++++ .../flink-dataset-allround-test/pom.xml | 2 + .../pom.xml | 25 +-- .../flink/sql/tests/StreamSQLTestProgram.java | 7 + flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/test-scripts/common.sh | 6 + .../test-scripts/test_streaming_bucketing.sh | 111 ++++++++++++ 8 files changed, 423 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml b/flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml new file mode 100644 index 0000000..3967ae7 --- /dev/null +++ b/flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml @@ -0,0 +1,112 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests</artifactId> + <version>1.6-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-bucketing-sink-test</artifactId> + <name>flink-bucketing-sink-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <exclusions> + <!-- Needed for proper dependency convergence --> + <exclusion> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>BucketingSinkTestProgram</finalName> + <artifactSet> + <excludes> + <exclude>com.google.code.findbugs:jsr305</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.BucketingSinkTestProgram</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> + http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java b/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java new file mode 100644 index 0000000..26d7e91 --- /dev/null +++ b/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer; +import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * End-to-end test for the bucketing sink. + * + * <p>Contains a simple stateful job that emits into buckets per key. + * + * <p>The stream is bounded and will complete after about a minute. + * The result is always constant. + * + * <p>Parameters: + * -outputPath Sets the path to where the result data is written. + */ +public class BucketingSinkTestProgram { + + public static void main(String[] args) throws Exception { + + ParameterTool params = ParameterTool.fromArgs(args); + String outputPath = params.getRequired("outputPath"); + + StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart( + 3, + Time.of(10, TimeUnit.SECONDS) + )); + sEnv.enableCheckpointing(4000); + + final int idlenessMs = 10; + + // define bucketing sink to emit the result + BucketingSink<Tuple4<Integer, Long, Integer, String>> sink = new BucketingSink<Tuple4<Integer, Long, Integer, String>>(outputPath) + .setBucketer(new KeyBucketer()); + + // generate data, shuffle, perform stateful operation, sink + sEnv.addSource(new Generator(10, idlenessMs, 60)) + .keyBy(0) + .map(new SubtractingMapper(-1L * idlenessMs)) + .addSink(sink); + + sEnv.execute(); + } + + /** + * Use first field for buckets. + */ + public static class KeyBucketer implements Bucketer<Tuple4<Integer, Long, Integer, String>> { + + @Override + public Path getBucketPath(Clock clock, Path basePath, Tuple4<Integer, Long, Integer, String> element) { + return basePath.suffix(String.valueOf(element.f0)); + } + } + + /** + * Subtracts the timestamp of the previous element from the current element. + */ + public static class SubtractingMapper extends RichMapFunction<Tuple3<Integer, Long, String>, Tuple4<Integer, Long, Integer, String>> { + + private final long initialValue; + + private ValueState<Integer> counter; + private ValueState<Long> last; + + public SubtractingMapper(long initialValue) { + this.initialValue = initialValue; + } + + @Override + public void open(Configuration parameters) { + counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Types.INT)); + last = getRuntimeContext().getState(new ValueStateDescriptor<>("last", Types.LONG)); + } + + @Override + public Tuple4<Integer, Long, Integer, String> map(Tuple3<Integer, Long, String> value) throws IOException { + // update counter + Integer counterValue = counter.value(); + if (counterValue == null) { + counterValue = 0; + } + counter.update(counterValue + 1); + + // save last value + Long lastValue = last.value(); + if (lastValue == null) { + lastValue = initialValue; + } + last.update(value.f1); + + return Tuple4.of(value.f0, value.f1 - lastValue, counterValue, value.f2); + } + } + + /** + * Data-generating source function. + */ + public static class Generator implements SourceFunction<Tuple3<Integer, Long, String>>, ListCheckpointed<Long> { + + private final int numKeys; + private final int idlenessMs; + private final int durationMs; + + private long ms = 0; + + public Generator(int numKeys, int idlenessMs, int durationSeconds) { + this.numKeys = numKeys; + this.idlenessMs = idlenessMs; + this.durationMs = durationSeconds * 1000; + } + + @Override + public void run(SourceContext<Tuple3<Integer, Long, String>> ctx) throws Exception { + while (ms < durationMs) { + synchronized (ctx.getCheckpointLock()) { + for (int i = 0; i < numKeys; i++) { + ctx.collect(Tuple3.of(i, ms, "Some payload...")); + } + ms += idlenessMs; + } + Thread.sleep(idlenessMs); + } + } + + @Override + public void cancel() { } + + @Override + public List<Long> snapshotState(long checkpointId, long timestamp) { + return Collections.singletonList(ms); + } + + @Override + public void restoreState(List<Long> state) { + for (Long l : state) { + ms += l; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml b/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml index b247743..2592117 100644 --- a/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml +++ b/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml @@ -38,11 +38,13 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml b/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml index a7d09c0..f131d29 100644 --- a/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml +++ b/flink-end-to-end-tests/flink-parent-child-classloading-test/pom.xml @@ -39,11 +39,13 @@ <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> </dependencies> @@ -63,7 +65,7 @@ <goal>jar</goal> </goals> <configuration> - <classifier>ClassLoaderTestProgram</classifier> + <finalName>ClassLoaderTestProgram</finalName> <archive> <manifestEntries> @@ -80,27 +82,6 @@ </execution> </executions> </plugin> - - <!--simplify the name of the testing JARs for referring to them in the end-to-end test scripts--> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <id>rename</id> - <phase>package</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <target> - <copy file="${project.basedir}/target/flink-parent-child-classloading-test_${scala.binary.version}-${project.version}-ClassLoaderTestProgram.jar" tofile="${project.basedir}/target/ClassLoaderTestProgram.jar" /> - </target> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java index e9e79ac..bc07d61 100644 --- a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java @@ -19,6 +19,8 @@ package org.apache.flink.sql.tests; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -46,6 +48,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * End-to-end test for Stream SQL queries. @@ -71,6 +74,10 @@ public class StreamSQLTestProgram { String outputPath = params.getRequired("outputPath"); StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart( + 3, + Time.of(10, TimeUnit.SECONDS) + )); sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); sEnv.enableCheckpointing(4000); sEnv.getConfig().setAutoWatermarkInterval(1000); http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 26f192a..641a83b 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -38,6 +38,7 @@ under the License. <module>flink-parent-child-classloading-test</module> <module>flink-dataset-allround-test</module> <module>flink-stream-sql-test</module> + <module>flink-bucketing-sink-test</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/test-scripts/common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index fd592ff..73885e1 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -301,6 +301,12 @@ function s3_delete { https://${bucket}.s3.amazonaws.com/${s3_file} } +function kill_random_taskmanager { + KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}') + kill -9 "$KILL_TM" + echo "TaskManager $KILL_TM killed." +} + # make sure to clean up even in case of failures function cleanup { stop_cluster http://git-wip-us.apache.org/repos/asf/flink/blob/3df5c668/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh new file mode 100755 index 0000000..b4c3ce9 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh @@ -0,0 +1,111 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-bucketing-sink-test/target/BucketingSinkTestProgram.jar + +# enable DEBUG logging level to retrieve truncate length later +sed -i -e 's/#log4j.logger.org.apache.flink=INFO/log4j.logger.org.apache.flink=DEBUG/g' $FLINK_DIR/conf/log4j.properties + +start_cluster +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +function bucketing_cleanup() { + + stop_cluster + $FLINK_DIR/bin/taskmanager.sh stop-all + + # restore default logging level + sed -i -e 's/log4j.logger.org.apache.flink=DEBUG/#log4j.logger.org.apache.flink=INFO/g' $FLINK_DIR/conf/log4j.properties + + # make sure to run regular cleanup as well + cleanup +} +trap bucketing_cleanup INT +trap bucketing_cleanup EXIT + +JOB_ID=$($FLINK_DIR/bin/flink run -d -p 4 $TEST_PROGRAM_JAR -outputPath $TEST_DATA_DIR/out/result \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running ${JOB_ID} + +sleep 40 + +echo "Killing TM" + +# kill task manager +kill_random_taskmanager + +echo "Starting TM" + +# start task manager again +$FLINK_DIR/bin/taskmanager.sh start + +echo "Killing 2 TMs" + +# kill two task managers again shortly after +kill_random_taskmanager +kill_random_taskmanager + +echo "Starting 2 TMs and waiting for successful completion" + +# start task manager again and let job finish +$FLINK_DIR/bin/taskmanager.sh start +$FLINK_DIR/bin/taskmanager.sh start + +# the job should complete in under 60s because half of the work has been checkpointed +sleep 100 + +# get truncate information +# e.g. "xxx xxx DEBUG xxx.BucketingSink - Writing valid-length file for xxx/out/result8/part-0-0 to specify valid length 74994" +LOG_LINES=$(grep -rnw $FLINK_DIR/log -e 'Writing valid-length file') + +# perform truncate on every line +echo "Truncating buckets" +while read -r LOG_LINE; do + PART=$(echo "$LOG_LINE" | awk '{ print $10 }' FS=" ") + LENGTH=$(echo "$LOG_LINE" | awk '{ print $15 }' FS=" ") + + echo "Truncating $PART to $LENGTH" + + dd if=$PART of="$PART.truncated" bs=$LENGTH count=1 + rm $PART + mv "$PART.truncated" $PART +done <<< "$LOG_LINES" + +# get all lines in pending or part files +find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname "part-*" \) -exec cat {} + > ${TEST_DATA_DIR}/complete_result + +# for debugging purposes +#echo "Checking proper result..." +#for KEY in {0..9}; do +# for IDX in {0..5999}; do +# FOUND_LINES=$(grep "($KEY,10,$IDX,Some payload...)" ${TEST_DATA_DIR}/complete_result | wc -l) +# if [ ${FOUND_LINES} != 1 ] ; then +# echo "Unexpected count $FOUND_LINES for ($KEY,10,$IDX,Some payload...)" +# PASS="" +# exit 1 +# fi +# done +#done + +check_result_hash "Bucketing Sink" $TEST_DATA_DIR/complete_result "01aba5ff77a0ef5e5cf6a727c248bdc3"