This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push: new d1d31d0 [FLINK-10773] Harden resume externalized checkpoint end-to-end test d1d31d0 is described below commit d1d31d06d28021bfcf66d2b1ef2454ecfbb8a5a2 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Sun Nov 4 19:32:44 2018 +0100 [FLINK-10773] Harden resume externalized checkpoint end-to-end test Ignore the 'Artificial Failure' exceptions and rename ExceptionThrowingFailureMapper into FailureMapper to avoid false positive exception matchings. --- .../flink/streaming/tests/DataStreamAllroundTestJobFactory.java | 6 ++++-- .../apache/flink/streaming/tests/DataStreamAllroundTestProgram.java | 6 +++--- .../{ExceptionThrowingFailureMapper.java => FailureMapper.java} | 4 ++-- flink-end-to-end-tests/test-scripts/common.sh | 1 + 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 4710100..68b752b 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -311,6 +311,8 @@ class DataStreamAllroundTestJobFactory { SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(), SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) { + private static final long serialVersionUID = -3154419724891779938L; + @Override public long extractTimestamp(Event element) { return element.getEventTime(); @@ -339,8 +341,8 @@ class DataStreamAllroundTestJobFactory { return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), TEST_SIMULATE_FAILURE.defaultValue()); } - static MapFunction<Event, Event> createExceptionThrowingFailureMapper(ParameterTool pt) { - return new ExceptionThrowingFailureMapper<>( + static MapFunction<Event, Event> createFailureMapper(ParameterTool pt) { + return new FailureMapper<>( pt.getLong( TEST_SIMULATE_FAILURE_NUM_RECORDS.key(), TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()), diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index afbc01a..c7efe42 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -32,7 +32,7 @@ import java.util.Collections; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource; -import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures; @@ -58,7 +58,7 @@ public class DataStreamAllroundTestProgram { private static final String KEYED_STATE_OPER_NAME = "ArtificalKeyedStateMapper"; private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper"; private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper"; - private static final String FAILURE_MAPPER_NAME = "ExceptionThrowingFailureMapper"; + private static final String FAILURE_MAPPER_NAME = "FailureMapper"; public static void main(String[] args) throws Exception { final ParameterTool pt = ParameterTool.fromArgs(args); @@ -94,7 +94,7 @@ public class DataStreamAllroundTestProgram { if (isSimulateFailures(pt)) { eventStream2 = eventStream2 - .map(createExceptionThrowingFailureMapper(pt)) + .map(createFailureMapper(pt)) .setParallelism(1) .name(FAILURE_MAPPER_NAME); } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java similarity index 94% rename from flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java rename to flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java index d758ef5..a3a1c25 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.state.CheckpointListener; * of the operator can also be configured. Note that this also takes into account * failures that were not triggered by this mapper, e.g. TaskManager failures. */ -public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> implements CheckpointListener { +public class FailureMapper<T> extends RichMapFunction<T, T> implements CheckpointListener { private static final long serialVersionUID = -5286927943454740016L; @@ -41,7 +41,7 @@ public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> imp private long numProcessedRecords; private long numCompleteCheckpoints; - public ExceptionThrowingFailureMapper( + public FailureMapper( long numProcessedRecordsFailureThreshold, long numCompleteCheckpointsFailureThreshold, int maxNumFailures) { diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 32d186b..e8a4363 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -253,6 +253,7 @@ function check_logs_for_exceptions { | grep -v "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration" \ | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \ | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \ + | grep -v "java.lang.Exception: Artificial failure" \ | grep -ic "exception") if [[ ${exception_count} -gt 0 ]]; then echo "Found exception in log files:"