mjsax commented on a change in pull request #9688: URL: https://github.com/apache/kafka/pull/9688#discussion_r535950501
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -457,33 +439,46 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { // phase 6: (complete second batch of data; crash: let second client fail on commit) // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes): // - // stop case: - // p-0: 10 rec + C + 5 rec + C ---> 5 rec + C - // p-1: 10 rec + C + 5 rec + C ---> 5 rec + C - // p-2: 10 rec + C + 5 rec + C ---> 5 rec + C - // p-3: 10 rec + C + 5 rec + C ---> 5 rec + C - // crash case: - // p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C - // p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C - // p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C - // p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C + // stop case: (both client commit regularly) + // (depending on the task movement in phase 5, we may or may not get newly committed data; + // we show the case for which p-2 and p-3 are newly committed below) + // p-0: 10 rec + C + 5 rec + C ---> 5 rec + C + // p-1: 10 rec + C + 5 rec + C ---> 5 rec + C + // p-2: 10 rec + C + 5 rec ---> 5 rec + C + // p-3: 10 rec + C + 5 rec ---> 5 rec + C + // crash case: (second/alpha client fails and both TX are aborted) + // (first/beta client reprocessed the 10 records and commits TX) + // p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C + // p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C + // p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C + // p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C commitCounterClient1.set(0); if (!injectError) { - final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = - prepareData(15L, 20L, 0L, 1L, 2L, 3L); - writeInputData(committedInputDataDuringUpgrade); + final List<KeyValue<Long, Long>> finishSecondBatch = prepareData(15L, 20L, 0L, 1L, 2L, 3L); + writeInputData(finishSecondBatch); + final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade Review comment: This is the second fix: depending on task movement, we have different set of committed records. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org