[ 
https://issues.apache.org/jira/browse/SAMZA-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-1730:
-------------------------------------------
    Description: 
Existing StreamProcessor implementation doesn't have state variable to 
represent it's current state(if it's in rebalance/shutdown/running state). 
Absence of this information leads to following two problems.
 - When execution of StreamProcessor.stop() and 
JobCoordinatorListener.onNewJobModel() happen concurrently, SamzaContainer 
might be still running after StreamProcessor had been stopped(due to 
interleaved execution order). Here's a sample execution order:

                 1. User thread invokes `SamzaContaienr.stop()`. 
                  2. `StreamProcessor.stop()` stops the current running samza 
container.
                  3. Before StreamProcessor stops the ZkJobCoordinator, 
ZkJobCoordinator initializes and executes a new SamzaContainer(Due to change in 
global processors group).
                  4. StreamProcessor stops the ZkJobCoordinator.
 - When execution of StreamProcessor.stop() and 
JobCoordinatorListener.jobModelExpired() happen concurrently, StreamProcessor 
will not be stopped cleanly.

                `paused` is state variable held in SamzaContainer to indicate 
if it has been stopped for new JobModel for JobCoordinator(By default `paused` 
is set to `false` in `SamzaContainer`). Here's a sample execution order:

                1. User thread invokes `SamzaContainer.stop()` and triggers 
`SamzaContainer.shutdown`. At the point, user thread is waiting for 
`onContainerStop(paused=false)`(container stopped callback with paused = false).
                 2. Before the SamzaContainer is shutdown, debounce thread 
invokes `onJobModelExpired` and triggers `SamzaContainer.pause()`. 
`SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
                 3 . SamzaContainer shuts down and triggers the container 
shutdown callback: onContainerStop(with paused = true). When paused is set to 
true in onContainerStop callback, StreamProcessor shutdown sequence is not 
triggered. 
                 4. StreamProcessor would participate in the processor group 
coordination activities as if shutdown was not triggered.
                 5. LocalApplicationRunner.waitForFinish will block 
indefinitely.

To solve the above problems, following changes were done to StreamProcessor:
 * Add state field to `StreamProcessor` to represent it's current state. Before 
performing any StreamProcessor operation, it's current state is checked and 
only if it's valid for the operation then the operation is performed.
 * Remove the paused state from SamzaContainer and it is covered by the 
StreamProcessor state itself.
 * Make onJobModelExpired as start state for any custom JobCoordinators. 
 * Interrupt the container thread on StreamProcessor.stop() if it's running 
after shutdown timeout.
 * Add tests to verify the behavior in StreamProcessor/LocalApplicationRunner.

  was:
Existing StreamProcessor implementation doesn't have state variable to 
represent it's current state(if it's in rebalance/shutdown/running state). 
Absence of this information leads to following two problems.
 - When execution of StreamProcessor.stop() and 
JobCoordinatorListener.onNewJobModel() happen concurrently, SamzaContainer 
might be still running after StreamProcessor had been stopped(due to 
interleaved execution order). Here's a sample execution order:

                 1. User thread invokes `SamzaContaienr.stop()`. 
                  2. `StreamProcessor.stop()` stops the current running samza 
container.
                  3. Before StreamProcessor stops the ZkJobCoordinator, 
ZkJobCoordinator initializes and executes a new SamzaContainer(Due to change in 
global processors group).
                  4. StreamProcessor stops the ZkJobCoordinator.
 - When execution of StreamProcessor.stop() and 
JobCoordinatorListener.jobModelExpired() happen concurrently, StreamProcessor 
will not be stopped cleanly.

                `paused` is state variable held in SamzaContainer to indicate 
if it has been stopped for new JobModel for JobCoordinator(By default `paused` 
is set to `false` in `SamzaContainer`). Here's a sample execution order:

                1. User thread invokes `SamzaContainer.stop()` and triggers 
`SamzaContainer.shutdown`. At the point, user thread is waiting for 
`onContainerStop(paused=false)`(container stopped callback with paused = false).
                 2. Before the SamzaContainer is shutdown, debounce thread 
invokes `onJobModelExpired` and triggers `SamzaContainer.pause()`. 
`SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
                 3 . SamzaContainer shuts down and triggers the container 
shutdown callback: onContainerStop(with paused = true). When paused is set to 
true in onContainerStop callback, StreamProcessor shutdown sequence is not 
triggered. 
                 4. StreamProcessor would participate in the processor group 
coordination activities as if shutdown was not triggered.
                 5. LocalApplicationRunner.waitForFinish will block 
indefinitely.

To solve the above problems, following changes were done to StreamProcessor:
 * Add state field to `StreamProcessor` to represent it's current state. Before 
performing any StreamProcessor operation, it's current state is checked and 
only if it's valid for the operation then the operation is performed.
 * Remove the paused state from SamzaContainer and it is covered by the 
StreamProcessor state itself.
 * Make onJobModelExpired as start state for any custom coordinators. 
 * Interrupt the container thread on StreamProcessor.stop() if it's running 
after shutdown timeout.
 * Add tests to verify the behavior in StreamProcessor/LocalApplicationRunner.


> Add state validations in StreamProcesssor.
> ------------------------------------------
>
>                 Key: SAMZA-1730
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1730
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> Existing StreamProcessor implementation doesn't have state variable to 
> represent it's current state(if it's in rebalance/shutdown/running state). 
> Absence of this information leads to following two problems.
>  - When execution of StreamProcessor.stop() and 
> JobCoordinatorListener.onNewJobModel() happen concurrently, SamzaContainer 
> might be still running after StreamProcessor had been stopped(due to 
> interleaved execution order). Here's a sample execution order:
>                  1. User thread invokes `SamzaContaienr.stop()`. 
>                   2. `StreamProcessor.stop()` stops the current running samza 
> container.
>                   3. Before StreamProcessor stops the ZkJobCoordinator, 
> ZkJobCoordinator initializes and executes a new SamzaContainer(Due to change 
> in global processors group).
>                   4. StreamProcessor stops the ZkJobCoordinator.
>  - When execution of StreamProcessor.stop() and 
> JobCoordinatorListener.jobModelExpired() happen concurrently, StreamProcessor 
> will not be stopped cleanly.
>                 `paused` is state variable held in SamzaContainer to indicate 
> if it has been stopped for new JobModel for JobCoordinator(By default 
> `paused` is set to `false` in `SamzaContainer`). Here's a sample execution 
> order:
>                 1. User thread invokes `SamzaContainer.stop()` and triggers 
> `SamzaContainer.shutdown`. At the point, user thread is waiting for 
> `onContainerStop(paused=false)`(container stopped callback with paused = 
> false).
>                  2. Before the SamzaContainer is shutdown, debounce thread 
> invokes `onJobModelExpired` and triggers `SamzaContainer.pause()`. 
> `SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
>                  3 . SamzaContainer shuts down and triggers the container 
> shutdown callback: onContainerStop(with paused = true). When paused is set to 
> true in onContainerStop callback, StreamProcessor shutdown sequence is not 
> triggered. 
>                  4. StreamProcessor would participate in the processor group 
> coordination activities as if shutdown was not triggered.
>                  5. LocalApplicationRunner.waitForFinish will block 
> indefinitely.
> To solve the above problems, following changes were done to StreamProcessor:
>  * Add state field to `StreamProcessor` to represent it's current state. 
> Before performing any StreamProcessor operation, it's current state is 
> checked and only if it's valid for the operation then the operation is 
> performed.
>  * Remove the paused state from SamzaContainer and it is covered by the 
> StreamProcessor state itself.
>  * Make onJobModelExpired as start state for any custom JobCoordinators. 
>  * Interrupt the container thread on StreamProcessor.stop() if it's running 
> after shutdown timeout.
>  * Add tests to verify the behavior in StreamProcessor/LocalApplicationRunner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to