This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new d1d31d0  [FLINK-10773] Harden resume externalized checkpoint 
end-to-end test
d1d31d0 is described below

commit d1d31d06d28021bfcf66d2b1ef2454ecfbb8a5a2
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Sun Nov 4 19:32:44 2018 +0100

    [FLINK-10773] Harden resume externalized checkpoint end-to-end test
    
    Ignore the 'Artificial Failure' exceptions and rename
    ExceptionThrowingFailureMapper into FailureMapper to avoid false
    positive exception matchings.
---
 .../flink/streaming/tests/DataStreamAllroundTestJobFactory.java     | 6 ++++--
 .../apache/flink/streaming/tests/DataStreamAllroundTestProgram.java | 6 +++---
 .../{ExceptionThrowingFailureMapper.java => FailureMapper.java}     | 4 ++--
 flink-end-to-end-tests/test-scripts/common.sh                       | 1 +
 4 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 4710100..68b752b 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -311,6 +311,8 @@ class DataStreamAllroundTestJobFactory {
                                        
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
                                        
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
 
+                       private static final long serialVersionUID = 
-3154419724891779938L;
+
                        @Override
                        public long extractTimestamp(Event element) {
                                return element.getEventTime();
@@ -339,8 +341,8 @@ class DataStreamAllroundTestJobFactory {
                return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), 
TEST_SIMULATE_FAILURE.defaultValue());
        }
 
-       static MapFunction<Event, Event> 
createExceptionThrowingFailureMapper(ParameterTool pt) {
-               return new ExceptionThrowingFailureMapper<>(
+       static MapFunction<Event, Event> createFailureMapper(ParameterTool pt) {
+               return new FailureMapper<>(
                        pt.getLong(
                                TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),
                                
TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index afbc01a..c7efe42 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -32,7 +32,7 @@ import java.util.Collections;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
-import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createFailureMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
@@ -58,7 +58,7 @@ public class DataStreamAllroundTestProgram {
        private static final String KEYED_STATE_OPER_NAME = 
"ArtificalKeyedStateMapper";
        private static final String OPERATOR_STATE_OPER_NAME = 
"ArtificalOperatorStateMapper";
        private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
-       private static final String FAILURE_MAPPER_NAME = 
"ExceptionThrowingFailureMapper";
+       private static final String FAILURE_MAPPER_NAME = "FailureMapper";
 
        public static void main(String[] args) throws Exception {
                final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -94,7 +94,7 @@ public class DataStreamAllroundTestProgram {
 
                if (isSimulateFailures(pt)) {
                        eventStream2 = eventStream2
-                               .map(createExceptionThrowingFailureMapper(pt))
+                               .map(createFailureMapper(pt))
                                .setParallelism(1)
                                .name(FAILURE_MAPPER_NAME);
                }
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
similarity index 94%
rename from 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
rename to 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
index d758ef5..a3a1c25 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
  * of the operator can also be configured. Note that this also takes into 
account
  * failures that were not triggered by this mapper, e.g. TaskManager failures.
  */
-public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> 
implements CheckpointListener {
+public class FailureMapper<T> extends RichMapFunction<T, T> implements 
CheckpointListener {
 
        private static final long serialVersionUID = -5286927943454740016L;
 
@@ -41,7 +41,7 @@ public class ExceptionThrowingFailureMapper<T> extends 
RichMapFunction<T, T> imp
        private long numProcessedRecords;
        private long numCompleteCheckpoints;
 
-       public ExceptionThrowingFailureMapper(
+       public FailureMapper(
                        long numProcessedRecordsFailureThreshold,
                        long numCompleteCheckpointsFailureThreshold,
                        int maxNumFailures) {
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 32d186b..e8a4363 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -253,6 +253,7 @@ function check_logs_for_exceptions {
       | grep -v "Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.conf.Configuration" \
       | grep -v "java.lang.NoClassDefFoundError: 
org/apache/hadoop/yarn/exceptions/YarnException" \
       | grep -v "java.lang.NoClassDefFoundError: 
org/apache/hadoop/conf/Configuration" \
+      | grep -v "java.lang.Exception: Artificial failure" \
       | grep -ic "exception")
   if [[ ${exception_count} -gt 0 ]]; then
     echo "Found exception in log files:"

Reply via email to