[ 
https://issues.apache.org/jira/browse/SAMZA-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-1979:
-------------------------------------------
    Description: 
Samza relies on a daemon thread named StreamPartitionCountMonitor which runs in 
the JobCoordinator to determine any change in the partition count of input 
streams.

 

Here's the control flow of StreamPartitionCountMonitor.
 * JobCoordinator initializes the StreamPartitionCountMonitor thread with the 
partition count of the input streams.

 * StreamPartitionCountMonitor periodically fetches the partition count of the 
input streams and compares it against the initialized partition count.

 * In case of stateful jobs, if there's a difference in partition count, then 
it stops the JobCoordinator. 

 

 However for stateful jobs, the above solution might not work in the following 
scenarios: 

A. In between the scheduling interval of the monitor, let's say that there's a 
change in partition count of input stream. Before the 
StreamPartitionCountMonitor's successive scheduled run, let's say the 
JobCoordinator process is killed(due to nodemanager failure). Yarn will 
reschedule the Jobcoordinator process to run in a different nodemanager 
machine. When the StreamPartitionCountMonitor is run in a different application 
attempt of Jobcoordinator, then it would have missed the signal of input 
partition count change.
 B. Let's say that when the input stream partition count changes, the user 
stops the samza job. The daemon thread spawned in the subsequent run of the 
samza job would not have the adequate state to determine that the partition 
count of the input streams has changed.

The general pattern here is that before the monitor detects the partition count 
change, if the JobCoordinator process is killed then we would lose the 
in-memory state.

 

  was:
Samza relies on a daemon thread named StreamPartitionCountMonitor which runs in 
the JobCoordinator to determine any change in the partition count of input 
streams.

 

Here's the control flow of StreamPartitionCountMonitor.
 * JobCoordinator initializes the StreamPartitionCountMonitor thread with the 
partition count of the input streams.

 * StreamPartitionCountMonitor periodically fetches the partition count of the 
input streams and compares it against the initialized partition count.

 * In case of stateful jobs, if there's a difference in partition count, then 
it stops the JobCoordinator. 

 

 

However for stateful jobs, the above solution might not work in the following 
scenarios: 

A. In between the scheduling interval of the monitor, let's say that there's a 
change in partition count of input stream. Before the 
StreamPartitionCountMonitor's successive scheduled run, let's say the 
JobCoordinator process is killed(due to nodemanager failure). Yarn will 
reschedule the Jobcoordinator process to run in a different nodemanager 
machine. When the StreamPartitionCountMonitor is run in a different application 
attempt of Jobcoordinator, then it would have missed the signal of input 
partition count change.
 B. Let's say that when the input stream partition count changes, the user 
stops the samza job. The daemon thread spawned in the subsequent run of the 
samza job would not have the adequate state to determine that the partition 
count of the input streams has changed.

The general pattern here is that before the monitor detects the partition count 
change, if the JobCoordinator process is killed then we would lose the 
in-memory state.

 


> StreamPartitionCountMonitor bug.
> --------------------------------
>
>                 Key: SAMZA-1979
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1979
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> Samza relies on a daemon thread named StreamPartitionCountMonitor which runs 
> in the JobCoordinator to determine any change in the partition count of input 
> streams.
>  
> Here's the control flow of StreamPartitionCountMonitor.
>  * JobCoordinator initializes the StreamPartitionCountMonitor thread with the 
> partition count of the input streams.
>  * StreamPartitionCountMonitor periodically fetches the partition count of 
> the input streams and compares it against the initialized partition count.
>  * In case of stateful jobs, if there's a difference in partition count, then 
> it stops the JobCoordinator. 
>  
>  However for stateful jobs, the above solution might not work in the 
> following scenarios: 
> A. In between the scheduling interval of the monitor, let's say that there's 
> a change in partition count of input stream. Before the 
> StreamPartitionCountMonitor's successive scheduled run, let's say the 
> JobCoordinator process is killed(due to nodemanager failure). Yarn will 
> reschedule the Jobcoordinator process to run in a different nodemanager 
> machine. When the StreamPartitionCountMonitor is run in a different 
> application attempt of Jobcoordinator, then it would have missed the signal 
> of input partition count change.
>  B. Let's say that when the input stream partition count changes, the user 
> stops the samza job. The daemon thread spawned in the subsequent run of the 
> samza job would not have the adequate state to determine that the partition 
> count of the input streams has changed.
> The general pattern here is that before the monitor detects the partition 
> count change, if the JobCoordinator process is killed then we would lose the 
> in-memory state.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to