showuon commented on a change in pull request #9733:
URL: https://github.com/apache/kafka/pull/9733#discussion_r543882023



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -256,8 +261,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             streams2Alpha.cleanUp();
             streams2Alpha.start();
             assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);

Review comment:
       @mjsax 
   The stacktrace is like this. The line number is not mapped to master branch 
correctly, but you can know what it is from the method name. It failed when 
it's trying to get all state data and checking if the current stream is in 
`RUNNING` state, but it's under rebalancing. And this exception won't do any 
retry.
   ```
   Stacktrace
   
   org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
store store because the stream thread is PARTITIONS_ASSIGNED, not RUNNING
        at 
org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
        at 
org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
        at 
org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.all(CompositeReadOnlyKeyValueStore.java:119)
        at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.keysFromInstance(EosBetaUpgradeIntegrationTest.java:1112)
        at 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:494)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   ```
   
   
   And, yes, we make sure it completed the rebalance then checking the running 
state, but as I mentioned, there will be 2 rebalance happened(1 for Adding new 
member, 1 for leader re-joining group during Stable), and we only wait 1 
rebalance completes, so there might be another rebalancing later. The 2 stream 
state transition log is like this:
   ```
   stateTransitions1:
   [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, RUNNING), 
KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]  
   
   stateTransitions2: 
   [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING), 
KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)]
   ```
   So, as you can see, we might enter next step when we are in step 2 
(`KeyValue(REBALANCING, RUNNING)`), and there will be another rebalancing soon. 
That's why I'll wait explicitly for this transition pair `[KeyValue(RUNNING, 
REBALANCING), KeyValue(REBALANCING, RUNNING)]`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to