guozhangwang commented on a change in pull request #9688: URL: https://github.com/apache/kafka/pull/9688#discussion_r538062610
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -540,18 +535,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { } // 7. only for crash case: - // 7a. restart the second client in eos-alpha mode and wait until rebalance stabilizes + // 7a. restart the failed second client in eos-alpha mode and wait until rebalance stabilizes // 7b. write third batch of input data // * fail the first (i.e., eos-beta) client during commit // * the eos-alpha client should not pickup the pending offsets // * verify uncommitted and committed result // 7c. restart the first client in eos-beta mode and wait until rebalance stabilizes // // crash case: - // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C - // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C - // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C - // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C + // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C Review comment: Are these changes intentional? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -457,33 +439,46 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { // phase 6: (complete second batch of data; crash: let second client fail on commit) // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes): // - // stop case: - // p-0: 10 rec + C + 5 rec + C ---> 5 rec + C - // p-1: 10 rec + C + 5 rec + C ---> 5 rec + C - // p-2: 10 rec + C + 5 rec + C ---> 5 rec + C - // p-3: 10 rec + C + 5 rec + C ---> 5 rec + C - // crash case: - // p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C - // p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C - // p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C - // p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C + // stop case: (both client commit regularly) + // (depending on the task movement in phase 5, we may or may not get newly committed data; + // we show the case for which p-2 and p-3 are newly committed below) + // p-0: 10 rec + C + 5 rec + C ---> 5 rec + C + // p-1: 10 rec + C + 5 rec + C ---> 5 rec + C + // p-2: 10 rec + C + 5 rec ---> 5 rec + C + // p-3: 10 rec + C + 5 rec ---> 5 rec + C + // crash case: (second/alpha client fails and both TX are aborted) + // (first/beta client reprocessed the 10 records and commits TX) + // p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C + // p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C + // p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C + // p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C commitCounterClient1.set(0); if (!injectError) { - final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = - prepareData(15L, 20L, 0L, 1L, 2L, 3L); - writeInputData(committedInputDataDuringUpgrade); + final List<KeyValue<Long, Long>> finishSecondBatch = prepareData(15L, 20L, 0L, 1L, 2L, 3L); + writeInputData(finishSecondBatch); + final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade Review comment: Nice catch. Reminds me though, why the second rebalance may not be deterministic in migrating tasks back? I thought our algorithm should produce deterministic results? cc @ableegoldman ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -540,18 +535,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { } // 7. only for crash case: - // 7a. restart the second client in eos-alpha mode and wait until rebalance stabilizes + // 7a. restart the failed second client in eos-alpha mode and wait until rebalance stabilizes Review comment: nit: second failed client? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes): // // stop case: - // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C - // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C - // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C - // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C + // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C + // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C + // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C + // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C // crash case: (we just assumes that we inject the error for p-2; in reality it might be a different partition) - // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C - // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C - // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C - // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C + // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C + // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C + // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C + // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C commitCounterClient1.set(-1); commitCounterClient2.set(-1); - final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade = + final List<KeyValue<Long, Long>> finishLastBatch = prepareData(35L, 40L, 0L, 1L, 2L, 3L); - writeInputData(committedInputDataAfterUpgrade); + writeInputData(finishLastBatch); + + final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L); + uncommittedKeys.removeAll(keysSecondClientAlphaTwo); + uncommittedKeys.removeAll(newlyCommittedKeys); + final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade Review comment: @ableegoldman is it related to the UUID randomness? If yes please ignore my other question above. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org