showuon commented on a change in pull request #9733:
URL: https://github.com/apache/kafka/pull/9733#discussion_r543882023
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -256,8 +261,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws
Exception {
streams2Alpha.cleanUp();
streams2Alpha.start();
assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
Review comment:
@mjsax
The stacktrace is like this. The line number is not mapped to master branch
correctly, but you can know what it is from the method name. It failed when
it's trying to get all state data and checking if the current stream is in
`RUNNING` state, but it's under rebalancing. And this exception won't do any
retry.
```
Stacktrace
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state
store store because the stream thread is PARTITIONS_ASSIGNED, not RUNNING
at
org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
at
org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
at
org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
at
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.keysFromInstance(EosBetaUpgradeIntegrationTest.java:1112)
at
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:494)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
```
And, yes, we make sure it completed the rebalance then checking the running
state, but as I mentioned, there will be 2 rebalance happened(1 for Adding new
member, 1 for leader re-joining group during Stable), and we only wait 1
rebalance completes, so there might be another rebalancing later. The 2 stream
state transition log is like this:
```
stateTransitions1:
[KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, RUNNING),
KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]
stateTransitions2:
[KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING),
KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]
```
So, as you can see, we might enter next step when we are in step 2
(`KeyValue(REBALANCING, RUNNING)`), and there will be another rebalancing soon.
That's why I'll wait explicitly for this transition pair `[KeyValue(RUNNING,
REBALANCING), KeyValue(REBALANCING, RUNNING)]`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]