wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r540486098
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1142,11 +1128,21 @@ private Thread shutdownHelper(final boolean error) {
metrics.close();
if (!error) {
setState(State.NOT_RUNNING);
+ } else {
+ setState(State.ERROR);
}
}, "kafka-streams-close-thread");
}
private boolean close(final long timeoutMs) {
+ if (state == State.ERROR) {
Review comment:
We want to make `close()` idempotent and not throw an exception but we
will log a warning, but only for close so that is why these logs are not in the
`setState()` method.
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -594,9 +581,8 @@ public synchronized void onChange(final Thread thread,
if (newState == GlobalStreamThread.State.RUNNING) {
maybeSetRunning();
} else if (newState == GlobalStreamThread.State.DEAD) {
- if (setState(State.ERROR)) {
Review comment:
we will be doing the same thing but closing the client for now. Maybe a
replace globalThread will be added later
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -411,7 +405,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws
Exception {
errorInjectedClient1.set(false);
stateTransitions1.clear();
streams1Alpha.close();
- waitForStateTransition(stateTransitions1, CLOSE_CRASHED);
+ final KafkaStreams finalStreams1Alpha = streams1Alpha;
+ waitForCondition(() -> finalStreams1Alpha.state() ==
State.ERROR, "Stream did not go to ERROR");
Review comment:
Just wait for ERROR because you don't close crashed
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
}
@Test
- @SuppressWarnings("deprecation")
Review comment:
remove deprecation
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -532,22 +537,6 @@ public void setGlobalStateRestoreListener(final
StateRestoreListener globalState
this.threadStatesLock = new Object();
}
- /**
- * If all threads are dead set to ERROR
- */
- private void maybeSetError() {
Review comment:
This will not be needed with the new error definition
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -546,7 +546,7 @@ public void run() {
* @throws IllegalStateException If store gets registered after
initialized is already finished
* @throws StreamsException if the store's change log does not
contain the partition
*/
- void runLoop() {
+ boolean runLoop() {
Review comment:
This will let Streams shutdown uncleanly when in EOS mode
##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -418,70 +418,6 @@ public void
stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
}
- @Test
Review comment:
This test is for the functionality we are removing
##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -524,9 +460,6 @@ public void testStateThreadClose() throws Exception {
"Thread never stopped.");
streams.threads.get(i).join();
}
- TestUtils.waitForCondition(
Review comment:
There is no transition to Error here anymore
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final
String stateDirPath,
}
@Test
- @Deprecated
Review comment:
remove deprecated handler
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -139,7 +139,7 @@ public void shouldShutdownClient() throws
InterruptedException {
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
produceMessages(0L, inputTopic, "A");
- waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+ waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
Review comment:
When the client is shutdown it now goes to ERROR
##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -625,8 +561,7 @@ public void shouldNotAddThreadWhenError() {
final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
final int oldSize = streams.threads.size();
streams.start();
- streamThreadOne.shutdown();
- streamThreadTwo.shutdown();
+ globalStreamThread.shutdown();
Review comment:
removing stream threads does not put the client in error anymore. The
global does
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final
String stateDirPath,
}
@Test
- @Deprecated
public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing()
throws Exception {
+ final long time = System.currentTimeMillis();
Review comment:
There was a problem with the cleaner thread sometimes wiping out old
segments because they were expired
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws
Exception {
assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForStateTransition(stateTransitions2, CRASH);
-
commitErrorInjectedClient2.set(false);
stateTransitions2.clear();
streams2Alpha.close();
- waitForStateTransition(stateTransitions2, CLOSE_CRASHED);
Review comment:
No longer do you need to close crashed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]