[
https://issues.apache.org/jira/browse/SAMZA-2510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084250#comment-17084250
]
Bharath Kumarasubramanian commented on SAMZA-2510:
--------------------------------------------------
The following scenario could still result in different status being propagated
upstream based on the order of execution of process callback & run loop thread.
# Async processing has a bad message in flight
# Manual call to {{RunLoop.shutdown}}
# Exit main while loop
# {{throwable}} is still null since async processing is incomplete
# Async processing throws exception
# Container will exit with "success"
Note this is inline with the current semantics of external shutdown requests to
run loop. The semantics of manual call to {{RunLoop.shutdown}} is loosely
defined in the sense that it doesn't wait for things in flight to finish
rather, triggers shutdown as soon as runloop thread sees the request. Hence,
the status code will differ based on whether the exception got set before run
loop thread exited.
> Incorrect shutdown status due to race between runloop thread and process
> callback thread
> ----------------------------------------------------------------------------------------
>
> Key: SAMZA-2510
> URL: https://issues.apache.org/jira/browse/SAMZA-2510
> Project: Samza
> Issue Type: Bug
> Reporter: Bharath Kumarasubramanian
> Assignee: Bharath Kumarasubramanian
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> *Problem*: A race between the process callback thread and the runloop thread
> can result in incorrect shutdown status.
> *Description*: Currently runloop performs message choosing and dispatching in
> a loop indefinitely until interrupted by an external shutdown request or an
> exception during processing. In async mode, message completion is notified by
> the process callback thread using `onComplete()` and `onFailure()` to
> represent corresponding success and failure. The failure callback updates the
> exception code within the runloop to notify processing failures. Similarly,
> shutdown can be requested by user code or end of stream messages through
> various scopes(task level, container level). These requests are notified to
> the runloop through shutdownNow flag.
> Currently as long as shutdownNow flag is not set, the exception code is
> promptly seen by the runloop and propagated upstream correctly. However, if a
> shutdown is requested, runloop doesn't check the exception code before
> exiting.
> *Fix*:
> # Make sure we check the exception code before we exit.
> # Set the exception code on process failure before updating the state to
> done.
> *Stacktrace*
> {code:java}
> at
> org.apache.samza.operators.impl.OperatorImpl.onMessageAsync(OperatorImpl.java:173)
> ~[samza-core_2.11-310.1040.0.18.jar:310.1040.0.18]
> ... 787 more
> 2020-04-09 21:20:32.271 [pool-3-thread-1] RunLoop [ERROR] Got callback
> failure for task Partition 0
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onWatermark$25(OperatorImpl.java:416)
> ~[samza-core_2.11-310.1040.0.18.jar:310.1040.0.18]
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> ~[?:1.8.0_172]
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ~[?:1.8.0_172]
> ... 23 more
> 2020-04-09 21:20:32.295 [Samza StreamProcessor Container Thread-0]
> CoordinatorRequests [INFO] Shutdown has now been requested by tasks
> [Partition 0]
> 2020-04-09 21:20:32.295 [Samza StreamProcessor Container Thread-0]
> CoordinatorRequests [INFO] Shutdown requested.
> 2020-04-09 21:20:32.296 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down SamzaContainer.
> 2020-04-09 21:20:32.296 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down consumer multiplexer.
> 2020-04-09 21:20:32.297 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down task instance stream tasks.
> 2020-04-09 21:20:32.297 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down timer executor
> 2020-04-09 21:20:32.512 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down task instance table manager.
> 2020-04-09 21:20:32.514 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down container storage manager.
> 2020-04-09 21:20:32.520 [Samza StreamProcessor Container Thread-0]
> ContainerStorageManager [INFO] Shutdown complete
> 2020-04-09 21:20:32.521 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down host statistics monitor.
> 2020-04-09 21:20:32.522 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down producer multiplexer.
> 2020-04-09 21:20:32.522 [Samza StreamProcessor Container Thread-0]
> InMemorySystemProducer [INFO] Stopping in memory system producer for
> in-memory-test-system
> 2020-04-09 21:20:32.523 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down offset manager.
> 2020-04-09 21:20:32.524 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down metrics reporters.
> 2020-04-09 21:20:32.525 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down JVM metrics.
> 2020-04-09 21:20:32.525 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutting down admin multiplexer.
> 2020-04-09 21:20:32.526 [Samza StreamProcessor Container Thread-0]
> SamzaExecutionContext [INFO] Closed controlServer
> 2020-04-09 21:20:32.526 [Samza StreamProcessor Container Thread-0]
> SamzaExecutionContext [INFO] Closed dataServer
> 2020-04-09 21:20:32.526 [Samza StreamProcessor Container Thread-0]
> SamzaExecutionContext [INFO] Closed stateServer
> 2020-04-09 21:20:32.526 [Samza StreamProcessor Container Thread-0]
> SamzaExecutionContext [INFO] Closed jobBundle
> 2020-04-09 21:20:32.526 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [INFO] Shutdown complete.
> 2020-04-09 21:20:32.526 [Samza StreamProcessor Container Thread-0]
> StreamProcessor [INFO] Container:
> org.apache.samza.container.SamzaContainer@73b63917 stopped. Stopping the
> stream processor: 0.
> 2020-04-09 21:20:32.526 [Samza StreamProcessor Container Thread-0]
> StreamProcessor [INFO] Ignoring onJobModelExpired invocation since the
> current state is STOPPING and not in [RUNNING, STARTED, IN_REBALANCE].
> 2020-04-09 21:20:32.526 [Samza StreamProcessor Container Thread-0]
> StreamProcessor [INFO] Shutting down the executor service of the stream
> processor: 0.
> 2020-04-09 21:20:32.527 [Samza StreamProcessor Container Thread-0]
> SamzaContainer [WARN] Shutdown is no-op since the container is already in
> state: STOPPED
> 2020-04-09 21:20:32.527 [Samza StreamProcessor Container Thread-0]
> StreamProcessor [INFO] Waiting 30000 ms for the container:
> org.apache.samza.container.SamzaContainer@73b63917 to shutdown.
> 2020-04-09 21:20:32.527 [Samza StreamProcessor Container Thread-0]
> StreamProcessor [INFO] Shutdown status of container:
> org.apache.samza.container.SamzaContainer@73b63917 for stream processor: 0
> is: true.{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)