lucasbru commented on code in PR #18765:
URL: https://github.com/apache/kafka/pull/18765#discussion_r1940838447
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -347,8 +347,9 @@ private boolean setState(final State newState) {
} else if (state == State.REBALANCING && newState ==
State.REBALANCING) {
// when the state is already in REBALANCING, it should not
transit to REBALANCING again
return false;
- } else if (state == State.ERROR && (newState ==
State.PENDING_ERROR || newState == State.ERROR)) {
- // when the state is already in ERROR, its transition to
PENDING_ERROR or ERROR (due to consecutive close calls)
+ } else if (state == State.ERROR && (newState ==
State.PENDING_ERROR || newState == State.ERROR || newState ==
State.PENDING_SHUTDOWN)) {
Review Comment:
> Bad code... using if (state == State.PENDING_ERROR... twice...
I think you are confused by the diff. There is only one such case
> I am confused.
I am as well, this code looks pretty organically grown and somewhat
inconsistent, and probably pretty finicky to make consistent and clean up.
Given the scope of this PR is fixing a specific race condition, I don't feel
like overhauling the state machine now, unless there is a concrete problem we
want to fix.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,38 +1547,30 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
- return true;
+ if (!setState(State.PENDING_SHUTDOWN)) {
+ final State stateCopy = state;
+ if (stateCopy.isShuttingDown()) {
+ log.info("Skipping shutdown since Streams client is already in
{}, waiting for a terminal state", stateCopy);
+ if (!waitOnStates(timeoutMs, State.ERROR, State.NOT_RUNNING)) {
+ log.warn("Streams client did transition to a terminal
state (ERROR or NOT_RUNNING) within the {}ms timeout", timeoutMs);
+ return false;
+ }
+ log.info("Streams client stopped completely and transitioned
to the terminal {} state", state);
+ } else if (stateCopy.hasCompletedShutdown()) {
+ log.info("Skipping shutdown since Streams client is already in
the terminal {} state", stateCopy);
Review Comment:
This is fine with me. I was also moving around the `return` statements a
couple of times, trying to clarify it. We are concurrently trying to advance a
state machine, which ideally should be single-threaded (not intending to
overhaul this in this PR though), so a certain amount of complexity is
inherent. Your points about usage of `state` vs. `stateCopy` is valid. Updated.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -308,7 +310,8 @@ private boolean waitOnState(final State targetState, final
long waitMs) {
interrupted = true;
}
} else {
- log.debug("Cannot transit to {} within {}ms",
targetState, waitMs);
+ log.debug("Cannot transit to {} within {}ms",
+
Arrays.stream(targetStates).map(State::toString).collect(Collectors.joining("
or ")), waitMs);
Review Comment:
Done
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,38 +1547,30 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
- return true;
+ if (!setState(State.PENDING_SHUTDOWN)) {
+ final State stateCopy = state;
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]