[ 
https://issues.apache.org/jira/browse/FLINK-11309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11309:
---------------------------
    Description: 
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # Each *SpillableSubpartition* contains multi *SpillableSubpartitionViews*, 
each of which is corresponding to a *Execution*.

 

  was:
Hi all,

When running the batch WordCount example,  I configured the job execution mode 
as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected some 
errors to let the execution fail in different phases. In some cases, the job 
could recovery from failover and became succeed, but in some cases, the job 
retried several times and failed.

Example:
 # If the failure occurred before task read data, e.g., failed before 
*invokable.invoke()* in Task.java, failover could succeed.
 # If the failure occurred after task having read data, failover did not work.

 

Problem diagnose:

Running the example described before, each ExecutionVertex is defined as a 
restart region, and the ResultPartitionType between executions is *BLOCKING.*  
Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
write/read shuffle data, and data block is described as *BufferConsumer* stored 
in a list called *buffers,* when task requires input data from 
*SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
when failures occurred after having read data, some *BufferConsumers* have 
already released, although tasks retried, the input data is incomplete.

 

Fix Proposal:
 # *BufferConsumer* should not be removed from buffers until *ExecutionVertex* 
terminates.
 # *SpillableSubpartition* should not be released until *ExecutionVertex* 
terminates.
 # Each *SpillableSubpartition* contains multi *SpillableSubpartitionView*s, 
each of which is corresponding to a *Execution*.

 


> Batch Job Failover Using RestartPipelinedRegionStrategy Fails in Some Scenes 
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-11309
>                 URL: https://issues.apache.org/jira/browse/FLINK-11309
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.6.2, 1.7.0, 1.7.1
>            Reporter: BoWang
>            Assignee: BoWang
>            Priority: Critical
>
> Hi all,
> When running the batch WordCount example,  I configured the job execution 
> mode as *BATCH_FORCED*, and failover-strategy as *region*, I manual injected 
> some errors to let the execution fail in different phases. In some cases, the 
> job could recovery from failover and became succeed, but in some cases, the 
> job retried several times and failed.
> Example:
>  # If the failure occurred before task read data, e.g., failed before 
> *invokable.invoke()* in Task.java, failover could succeed.
>  # If the failure occurred after task having read data, failover did not work.
>  
> Problem diagnose:
> Running the example described before, each ExecutionVertex is defined as a 
> restart region, and the ResultPartitionType between executions is *BLOCKING.* 
>  Thus, *SpillableSubpartition* and *SpillableSubpartitionView* are used to 
> write/read shuffle data, and data block is described as *BufferConsumer* 
> stored in a list called *buffers,* when task requires input data from 
> *SpillableSubpartitionView,* *BufferConsumer* is REMOVED from buffers. Thus, 
> when failures occurred after having read data, some *BufferConsumers* have 
> already released, although tasks retried, the input data is incomplete.
>  
> Fix Proposal:
>  # *BufferConsumer* should not be removed from buffers until 
> *ExecutionVertex* terminates.
>  # *SpillableSubpartition* should not be released until *ExecutionVertex* 
> terminates.
>  # Each *SpillableSubpartition* contains multi *SpillableSubpartitionViews*, 
> each of which is corresponding to a *Execution*.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to