[
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361776#comment-15361776
]
ASF GitHub Bot commented on FLINK-3190:
---------------------------------------
Github user fijolekProjects commented on a diff in the pull request:
https://github.com/apache/flink/pull/1954#discussion_r69491638
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
---
@@ -174,71 +140,54 @@ private void validateConstraints(ExecutionGraph eg) {
@Test
public void testRestartAutomatically() throws Exception {
- Instance instance = ExecutionGraphTestUtils.getInstance(
- new
SimpleActorGateway(TestingUtils.directExecutionContext()),
- NUM_TASKS);
+ RestartStrategy restartStrategy = new
FixedDelayRestartStrategy(1, 1000);
+ Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple =
createExecutionGraph(restartStrategy);
+ ExecutionGraph eg = executionGraphInstanceTuple.f0;
- Scheduler scheduler = new
Scheduler(TestingUtils.defaultExecutionContext());
- scheduler.newInstanceAvailable(instance);
-
- JobVertex sender = new JobVertex("Task");
- sender.setInvokableClass(Tasks.NoOpInvokable.class);
- sender.setParallelism(NUM_TASKS);
-
- JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutionContext(),
- new JobID(),
- "Test job",
- new Configuration(),
- ExecutionConfigTest.getSerializedConfig(),
- AkkaUtils.getDefaultTimeout(),
- new FixedDelayRestartStrategy(1, 1000));
-
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+ restartAfterFailure(eg, new FiniteDuration(2,
TimeUnit.MINUTES), true);
+ }
- assertEquals(JobStatus.CREATED, eg.getState());
+ @Test
+ public void taskShouldFailWhenFailureRateLimitExceeded() throws
Exception {
+ FailureRateRestartStrategy restartStrategy = new
FailureRateRestartStrategy(2, TimeUnit.SECONDS, 0);
+ FiniteDuration timeout = new FiniteDuration(50,
TimeUnit.MILLISECONDS);
+ Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple =
createExecutionGraph(restartStrategy);
+ ExecutionGraph eg = executionGraphInstanceTuple.f0;
+
+ restartAfterFailure(eg, timeout, false);
+ restartAfterFailure(eg, timeout, false);
+ //failure rate limit not exceeded yet, so task is running
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ Thread.sleep(1000); //wait for a second to restart limit rate
- eg.scheduleForExecution(scheduler);
+ restartAfterFailure(eg, timeout, false);
+ restartAfterFailure(eg, timeout, false);
+ makeAFailureAndWait(eg, timeout);
--- End diff --
I deleted first half of the test (it's already tested in test below) and I
increased interval to 2 seconds
> Retry rate limits for DataStream API
> ------------------------------------
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
> Issue Type: Improvement
> Reporter: Sebastian Klemke
> Assignee: Michał Fijołek
> Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't
> make much sense: The job will accumulate transient errors over time and will
> die eventually when thresholds are exceeded. Rate limits are better suited in
> this scenario: A job should only die, if it fails too often in a given time
> frame. To better overcome transient errors, retry delays could be used, as
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't
> make any progress at all. We can measure progress by OperatorState changes
> and by observing output, as long as the operator in question is not a sink.
> If operator state changes and/or operator produces output, we can assume it
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per
> hour and a non-sink operator A. If the operator fails once every 10 minutes
> and produces output between failures, it should not lead to job termination.
> But if the operator fails 11 times in an hour or does not produce output
> between 11 consecutive failures, job should be terminated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)