lucasbru commented on code in PR #18765:
URL: https://github.com/apache/kafka/pull/18765#discussion_r1939279250
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+
+ if (state.isShuttingDown()) {
Review Comment:
I tried to preserving the existing log messages. But yeah, it also came to
my mind that this could be simplified. I made a pass and cleaned up the cases.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+
+ if (state.isShuttingDown()) {
+ log.info("Streams client is in {}, all resources are being
closed and the client will be stopped.", state);
+ if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
Review Comment:
You are right, there is another race condition here, although I guess the
effect is not as bad as the actual bug that I'm fixing here. We are logging
that we ran into timeout when we didn't. Anyway, good find and I will
piggy-back a fix for that race condition in this PR.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+
+ if (state.isShuttingDown()) {
+ log.info("Streams client is in {}, all resources are being
closed and the client will be stopped.", state);
+ if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
+ log.info("Streams client stopped to ERROR completely");
+ return true;
+ } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+ log.info("Streams client stopped to NOT_RUNNING
completely");
+ return true;
+ } else {
+ log.warn("Streams client cannot transition to {}
completely within the timeout",
+ state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING :
State.ERROR);
+ return false;
+ }
+ }
+
+ if (state.hasCompletedShutdown()) {
+ log.info("Streams client is already in the terminal {} state,
all resources are closed and the client has stopped.", state);
return true;
- } else {
- log.warn("Streams client cannot transition to {} completely
within the timeout",
- state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING :
State.ERROR);
- return false;
}
- }
- if (!setState(State.PENDING_SHUTDOWN)) {
// if we can't transition to PENDING_SHUTDOWN but not because
we're already shutting down, then it must be fatal
Review Comment:
This should indeed be unreachable in the current implementation. In the
fixed code, this exception is just a safeguard in case somebody changes the
state transitions, which can break the invariants in this code. We don't want
to silently ignore it. I'll convert it into an `IllegalStateException`, to make
clear this is not expected.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
Review Comment:
We could keep it, but it would just duplicate cases. I think having fewer
branches is better.
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1543,26 +1544,28 @@ private boolean close(final Optional<Long> timeout,
final boolean leaveGroup) {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
- return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+
+ if (state.isShuttingDown()) {
+ log.info("Streams client is in {}, all resources are being
closed and the client will be stopped.", state);
+ if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
+ log.info("Streams client stopped to ERROR completely");
+ return true;
+ } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+ log.info("Streams client stopped to NOT_RUNNING
completely");
+ return true;
+ } else {
+ log.warn("Streams client cannot transition to {}
completely within the timeout",
+ state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING :
State.ERROR);
+ return false;
+ }
+ }
+
+ if (state.hasCompletedShutdown()) {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]