mjsax commented on code in PR #18765:
URL: https://github.com/apache/kafka/pull/18765#discussion_r1938076912
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+
+ if (state.isShuttingDown()) {
+ log.info("Streams client is in {}, all resources are being
closed and the client will be stopped.", state);
+ if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
Review Comment:
Not sure logic is right? (I know you just move it, but it was not right to
begin with)
`setState(PENDING_SHUTDOWN)` above is an atomic compare-and-set as we get a
lock. So here, we only know we are in one of four states,` PENDING_SHUTDOWN`,
`NOT_RUNNING`, `PENDING_ERROR` or `ERROR`.
After `isShuttingDown()` returns, state could have transit to `ERROR` before
be get here (similar to `PENDING_SHUTDOWN` below; we could be in `NOT_RUNNING`
already), so we would end up in the final `else` and log a WARN and return
`false` incorrectly.
Should we combine both conditions into a single `waitOnState(ERROR ||
NOT_RUNNING)` (need a new `waitOnState` overload that take two "target
states"), and change the log to `log.info("Streams client stopped to {}
completely", state);`?
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+
+ if (state.isShuttingDown()) {
+ log.info("Streams client is in {}, all resources are being
closed and the client will be stopped.", state);
+ if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
+ log.info("Streams client stopped to ERROR completely");
+ return true;
+ } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+ log.info("Streams client stopped to NOT_RUNNING
completely");
+ return true;
+ } else {
+ log.warn("Streams client cannot transition to {}
completely within the timeout",
+ state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING :
State.ERROR);
+ return false;
+ }
+ }
+
+ if (state.hasCompletedShutdown()) {
Review Comment:
I think it would be simpler to just change this into a `wait` condition for
a terminal state (NOT_RUNNING or ERROR) with timeout, and add an `else` if the
timeout hits to log a WARN and return `false`.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
Review Comment:
Seems we could still keep this condition? Even if I agree, we might not need
to handler this case upfront any longer.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -347,8 +347,9 @@ private boolean setState(final State newState) {
} else if (state == State.REBALANCING && newState ==
State.REBALANCING) {
// when the state is already in REBALANCING, it should not
transit to REBALANCING again
return false;
- } else if (state == State.ERROR && (newState ==
State.PENDING_ERROR || newState == State.ERROR)) {
- // when the state is already in ERROR, its transition to
PENDING_ERROR or ERROR (due to consecutive close calls)
+ } else if (state == State.ERROR && (newState ==
State.PENDING_ERROR || newState == State.ERROR || newState ==
State.PENDING_SHUTDOWN)) {
Review Comment:
> We need transitions of all shutdown states (PENDING_SHUTDOWN, NOT_RUNNING,
ERROR, PENDING_ERROR) to PENDING_SHUTDOWN to be non-fatal (rejecting transition
but not throwing)
For this case, we only covered `PENDING_SHUTDOWN -> PENDING_SHUTDOWN` so far
(very first check). This PR currently adds `ERROR -> PENDING_SHUTDOWN`.
We would still miss `PENDING_ERROR -> PENDING_SHUTDOWN` below, as well as
`NOT_RUNNING -> `PENDING_SHUTDOWN` above?
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+
+ if (state.isShuttingDown()) {
+ log.info("Streams client is in {}, all resources are being
closed and the client will be stopped.", state);
+ if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
+ log.info("Streams client stopped to ERROR completely");
+ return true;
+ } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+ log.info("Streams client stopped to NOT_RUNNING
completely");
+ return true;
+ } else {
+ log.warn("Streams client cannot transition to {}
completely within the timeout",
+ state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING :
State.ERROR);
+ return false;
+ }
+ }
+
+ if (state.hasCompletedShutdown()) {
+ log.info("Streams client is already in the terminal {} state,
all resources are closed and the client has stopped.", state);
return true;
- } else {
- log.warn("Streams client cannot transition to {} completely
within the timeout",
- state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING :
State.ERROR);
- return false;
}
- }
- if (!setState(State.PENDING_SHUTDOWN)) {
// if we can't transition to PENDING_SHUTDOWN but not because
we're already shutting down, then it must be fatal
Review Comment:
I think this condition cannot be reached any longer, given the new logic?
After `if(!setState(State.PENDING_SHUTDOWN))` we know we are in one of the four
(pending) terminal states already.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+
+ if (state.isShuttingDown()) {
Review Comment:
Not sure if we still need/want this? Why do we care if we are in "pending"
state, or already fully terminated? In the end, we could not transit to
"pending_shotdown", and just need to wait for the terminal state. What do we
gain to keep this code path (also cf below: I think it's error prone anyway).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]