showuon commented on a change in pull request #9733: URL: https://github.com/apache/kafka/pull/9733#discussion_r543882023
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -256,8 +261,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { streams2Alpha.cleanUp(); streams2Alpha.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); Review comment: @mjsax The stacktrace is like this. The line number is not mapped to master branch correctly, but you can know what it is from the method name. It failed when it's trying to get all state data and checking if the current stream is in `RUNNING` state, but it's under rebalancing. And this exception won't do any retry. ``` Stacktrace org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store store because the stream thread is PARTITIONS_ASSIGNED, not RUNNING at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81) at org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50) at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.keysFromInstance(EosBetaUpgradeIntegrationTest.java:1112) at org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:494) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ``` And, yes, we make sure it completed the rebalance then checking the running state, but as I mentioned, there will be 2 rebalance happened(1 for Adding new member, 1 for leader re-joining group during Stable), and we only wait 1 rebalance completes, so there might be another rebalancing later. The 2 stream state transition log is like this: ``` stateTransitions1: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] stateTransitions2: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] ``` So, as you can see, we might enter next step when we are in step 2 (`KeyValue(REBALANCING, RUNNING)`), and there will be another rebalancing soon. That's why I'll wait explicitly for this transition pair `[KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org