mjsax commented on code in PR #18765:
URL: https://github.com/apache/kafka/pull/18765#discussion_r1940327370
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,38 +1547,30 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
- return true;
+ if (!setState(State.PENDING_SHUTDOWN)) {
+ final State stateCopy = state;
+ if (stateCopy.isShuttingDown()) {
+ log.info("Skipping shutdown since Streams client is already in
{}, waiting for a terminal state", stateCopy);
+ if (!waitOnStates(timeoutMs, State.ERROR, State.NOT_RUNNING)) {
+ log.warn("Streams client did transition to a terminal
state (ERROR or NOT_RUNNING) within the {}ms timeout", timeoutMs);
+ return false;
+ }
+ log.info("Streams client stopped completely and transitioned
to the terminal {} state", state);
+ } else if (stateCopy.hasCompletedShutdown()) {
+ log.info("Skipping shutdown since Streams client is already in
the terminal {} state", stateCopy);
Review Comment:
I think the code is correct, but hard to reason about, if not thinking very
carefully....
If `stateCopy.isShuttingDown() == false`, we know we are in a terminal state
(`NOT_RUNNING` or `ERROR`) and thus it should hold that `state == stateCopy`.
We do start with `if (stateCopy.isShuttingDown())` so using `else if
(stateCopy.hasCompletedShutdown())` is consistent and easy to read. However,
for the log, we do log `log.info("Streams client stopped completely and
transitioned to the terminal {} state", state);` above (what is correct und
using `stateCopy` would be incorrect), but it feels inconsistent with this log
line, which uses `stateCopy` and we could use `state`. If we use `state` here,
it raises the question if we should do `else if
(state.hasCompletedShutdown())`, but this make it inconsistent to `if
(stateCopy.isShuttingDown())`
Do I overthink this? -- My impression is, no matter what we do, something
feels inconsistent...?
Idea: rewrite to make it easier to read:
```
final State stateCopy = state;
if (stateCopy.isShuttingDown()) {
log.info("Skipping shutdown since Streams client is already in {},
waiting for a terminal state", stateCopy);
if (!waitOnStates(timeoutMs, State.ERROR, State.NOT_RUNNING)) {
log.warn("Streams client did transition to a terminal state (ERROR
or NOT_RUNNING) within the {}ms timeout", timeoutMs);
return false;
}
log.info("Streams client stopped completely and transitioned to the
terminal {} state", state);
return true;
}
if (state.hasCompletedShutdown()) {
log.info("Skipping shutdown since Streams client is already in the
terminal {} state", state);
return true;
}
throw new IllegalStateException("If transitioning to PENDING_SHUTDOWN fails,
the state should be either in "
+ "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
```
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -308,7 +310,8 @@ private boolean waitOnState(final State targetState, final
long waitMs) {
interrupted = true;
}
} else {
- log.debug("Cannot transit to {} within {}ms",
targetState, waitMs);
+ log.debug("Cannot transit to {} within {}ms",
+
Arrays.stream(targetStates).map(State::toString).collect(Collectors.joining("
or ")), waitMs);
Review Comment:
Nit formatting. We usually either have a single line, or pass one parameter
per line. Otherwise the code is very hard to read.
```
log.debug(
"Cannot transit to {} within {}ms",
Arrays.stream(targetStates).map(State::toString).collect(Collectors.joining("
or ")),
waitMs
);
```
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -347,8 +347,9 @@ private boolean setState(final State newState) {
} else if (state == State.REBALANCING && newState ==
State.REBALANCING) {
// when the state is already in REBALANCING, it should not
transit to REBALANCING again
return false;
- } else if (state == State.ERROR && (newState ==
State.PENDING_ERROR || newState == State.ERROR)) {
- // when the state is already in ERROR, its transition to
PENDING_ERROR or ERROR (due to consecutive close calls)
+ } else if (state == State.ERROR && (newState ==
State.PENDING_ERROR || newState == State.ERROR || newState ==
State.PENDING_SHUTDOWN)) {
Review Comment:
Ah. Bad code... using `if (state == State.PENDING_ERROR...` twice... Should
we unify this, and have a single `if(state ==` for each state instead of
multiple... (or use switch)?
There also seems to be some inconsistency with regard to "idempotent
transitions", ie, `oldState == newState`?
For `NOT_RUNNIG`, `ERROR`, `PENDING_ERROR `, `PENDING_SHUTDOWN`, and
`REBALANCING` we cover it here (even if hard to decode...), and return `false`,
if I read the code correctly.
`CREATED -> CREATED` would be invalid and we fail (sounda right to be, as
this should never happen).
For `RUNNING -> RUNNING` we encode it as "valid transition" and return
`true`. Fine. But what is the (semantic) different to `REBALANCING ->
REBALANCING` which is not considered valid (we return `false` for it)? Same for
`PENDING_ERROR` and `PENDING_SHUTDOWN?
I guess `NOT_RUNNIG` and `ERROR` fall into the `CREATED` category and should
never happen (so why do we not fail for them? And if we do not want to fail,
why return `false` and not just make them "valid transitions"?).
I am confused.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,38 +1547,30 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
- return true;
+ if (!setState(State.PENDING_SHUTDOWN)) {
+ final State stateCopy = state;
Review Comment:
Might be worth to add a comment why we do this -- otherwise, somebody might
remove it during "code cleanup"? In the end, it's not just about logging though
I think, but also about `isShuttingDown()` -- using a copy makea the call safe,
otherwise, the call would be subject to a race?
Should we call it `currentState` or `immutableState` or
`stateCopyToCheckIsShuttingDownAtomically` instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]