GitHub user shanthoosh opened a pull request:

    https://github.com/apache/samza/pull/535

    Adding state based valiations in StreamProcessor before doing any lifecycle 
operation and group coordination.

    Existing StreamProcessor implementation doesn't have state variable to 
represent it's current state(if it's in rebalance/shutdown/running state). 
Absence of this information leads to following two problems.
        
    When execution of StreamProcessor.stop() and 
JobCoordinatorListener.onNewJobModel() happen concurrently, SamzaContainer 
might be still running after StreamProcessor had been stopped(due to 
interleaved execution order). 
    
    Here's a sample execution order:
    
       1. User thread invokes `SamzaContaienr.stop()`.
       2. `StreamProcessor.stop()` stops the current running samza container.
       3. Before StreamProcessor stops the ZkJobCoordinator, ZkJobCoordinator 
initializes and executes a new SamzaContainer(Due to change in global 
processors group).
       4. StreamProcessor stops the ZkJobCoordinator.
        
    When execution of StreamProcessor.stop() and 
JobCoordinatorListener.jobModelExpired() happen concurrently, StreamProcessor 
will not be stopped cleanly.
    
    `paused` is state variable held in SamzaContainer to indicate if it has 
been stopped for new JobModel for JobCoordinator(By default `paused` is set to 
`false` in `SamzaContainer`). 
    
    Here's a sample execution order:
       1. User thread invokes `SamzaContainer.stop()` and triggers 
`SamzaContainer.shutdown`. At the point, user thread is waiting for 
`onContainerStop(paused=false)`(container stopped callback with paused = false).
       2. Before the SamzaContainer is shutdown, debounce thread invokes 
`onJobModelExpired` and triggers `SamzaContainer.pause()`. 
`SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
       3. SamzaContainer shuts down and triggers the container shutdown 
callback: onContainerStop(with paused = true). When paused is set to true in 
onContainerStop callback, StreamProcessor shutdown sequence is not triggered.
      4. StreamProcessor would participate in the processor group coordination 
activities as if shutdown was not triggered.
      5. LocalApplicationRunner.waitForFinish() invoked from user thread will 
block indefinitely.
        
    To solve the above problems, following changes were done:
     * Add state field to `StreamProcessor` to represent it's current state. 
Before performing any StreamProcessor operation, it's current state is checked 
and only if it's valid for the operation then the operation is performed.
    * Remove the paused state from SamzaContainer and it is covered by the 
StreamProcessor state itself.
    * Make onJobModelExpired as start state for any custom coordinators.
    * Interrupt the container thread on StreamProcessor.stop() if it's running 
after shutdown timeout.
    * Add tests to verify the behavior in 
StreamProcessor/LocalApplicationRunner.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shanthoosh/samza abced

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/samza/pull/535.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #535
    
----
commit a0260388f40563b926f5f1f9e6f94ecd618cea43
Author: Shanthoosh Venkataraman <santhoshvenkat1988@...>
Date:   2018-05-09T19:01:03Z

    Minor code cleanup.

commit 85031549a46b63737985a651dd8312f9bdf9cbee
Author: Shanthoosh Venkataraman <santhoshvenkat1988@...>
Date:   2018-05-16T00:47:04Z

    Changes
    
    * Rebase with recent master to resolve merge conflicts.
    * Make jcContainerShutdownLatch as volatile.

commit 299e006fbdcffc46b596ff0c379a1e91f31fe26a
Author: Shanthoosh Venkataraman <santhoshvenkat1988@...>
Date:   2018-05-17T22:01:05Z

    Adding state based valiations in StreamProcessor before doing any lifecycle 
operation and group coordination.
    
    Existing StreamProcessor implementation doesn't have state variable to 
represent it's current state(if it's in rebalance/shutdown/running state). 
Absence of this information leads to following two problems.
    
    When execution of StreamProcessor.stop() and 
JobCoordinatorListener.onNewJobModel() happen concurrently, SamzaContainer 
might be still running after StreamProcessor had been stopped(due to 
interleaved execution order). Here's a sample execution order:
          1. User thread invokes `SamzaContaienr.stop()`.
          2. `StreamProcessor.stop()` stops the current running samza container.
          3. Before StreamProcessor stops the ZkJobCoordinator, 
ZkJobCoordinator initializes and executes a new SamzaContainer(Due to change in 
global processors group).
          4. StreamProcessor stops the ZkJobCoordinator.
    
    When execution of StreamProcessor.stop() and 
JobCoordinatorListener.jobModelExpired() happen concurrently, StreamProcessor 
will not be stopped cleanly.
          `paused` is state variable held in SamzaContainer to indicate if it 
has been stopped for new JobModel for JobCoordinator(By default `paused` is set 
to `false` in `SamzaContainer`). Here's a sample execution order:
    
          1. User thread invokes `SamzaContainer.stop()` and triggers 
`SamzaContainer.shutdown`. At the point, user thread is waiting for 
`onContainerStop(paused=false)`(container stopped callback with paused = false).
          2. Before the SamzaContainer is shutdown, debounce thread invokes 
`onJobModelExpired` and triggers `SamzaContainer.pause()`. 
`SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
          3 . SamzaContainer shuts down and triggers the container shutdown 
callback: onContainerStop(with paused = true). When paused is set to true in 
onContainerStop callback, StreamProcessor shutdown sequence is not triggered.
          4. StreamProcessor would participate in the processor group 
coordination activities as if shutdown was not triggered.
          5. LocalApplicationRunner.waitForFinish will block indefinitely.
    
    To solve the above problems, following changes were done to StreamProcessor:
    i. Add state field to `StreamProcessor` to represent it's current state. 
Before performing any StreamProcessor operation, it's current state is checked 
and only if it's valid for the operation then the operation is performed.
    ii. Remove the paused state from SamzaContainer and it is covered by the 
StreamProcessor state itself.
    iii. Make onJobModelExpired as start state for any custom coordinators.
    iv. Interrupt the container thread on StreamProcessor.stop() if it's 
running after shutdown timeout.
    v. Add tests to verify the behavior in 
StreamProcessor/LocalApplicationRunner.

----


---

Reply via email to