guozhangwang commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r538062610



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -540,18 +535,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             }
 
             // 7. only for crash case:
-            //     7a. restart the second client in eos-alpha mode and wait 
until rebalance stabilizes
+            //     7a. restart the failed second client in eos-alpha mode and 
wait until rebalance stabilizes
             //     7b. write third batch of input data
             //         * fail the first (i.e., eos-beta) client during commit
             //         * the eos-alpha client should not pickup the pending 
offsets
             //         * verify uncommitted and committed result
             //     7c. restart the first client in eos-beta mode and wait 
until rebalance stabilizes
             //
             // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10 
rec + A + 10 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C ---> 10 
rec + A + 10 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 
rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 
rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C ---> 
10 rec + A + 10 rec + C

Review comment:
       Are these changes intentional?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -457,33 +439,46 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             // phase 6: (complete second batch of data; crash: let second 
client fail on commit)
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + C
-            // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
+            // stop case: (both client commit regularly)
+            //            (depending on the task movement in phase 5, we may 
or may not get newly committed data;
+            //             we show the case for which p-2 and p-3 are newly 
committed below)
+            //   p-0: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec     ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec     ---> 5 rec + C
+            // crash case: (second/alpha client fails and both TX are aborted)
+            //             (first/beta client reprocessed the 10 records and 
commits TX)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 
5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 
5 rec + C
             commitCounterClient1.set(0);
 
             if (!injectError) {
-                final List<KeyValue<Long, Long>> 
committedInputDataDuringUpgrade =
-                    prepareData(15L, 20L, 0L, 1L, 2L, 3L);
-                writeInputData(committedInputDataDuringUpgrade);
+                final List<KeyValue<Long, Long>> finishSecondBatch = 
prepareData(15L, 20L, 0L, 1L, 2L, 3L);
+                writeInputData(finishSecondBatch);
 
+                final List<KeyValue<Long, Long>> 
committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade

Review comment:
       Nice catch.
   
   Reminds me though, why the second rebalance may not be deterministic in 
migrating tasks back? I thought our algorithm should produce deterministic 
results? cc @ableegoldman 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -540,18 +535,18 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             }
 
             // 7. only for crash case:
-            //     7a. restart the second client in eos-alpha mode and wait 
until rebalance stabilizes
+            //     7a. restart the failed second client in eos-alpha mode and 
wait until rebalance stabilizes

Review comment:
       nit: second failed client?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
             // crash case:  (we just assumes that we inject the error for p-2; 
in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   5 rec + A + 5 rec + C ---> 5 rec + C
             commitCounterClient1.set(-1);
             commitCounterClient2.set(-1);
 
-            final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+            final List<KeyValue<Long, Long>> finishLastBatch =
                 prepareData(35L, 40L, 0L, 1L, 2L, 3L);
-            writeInputData(committedInputDataAfterUpgrade);
+            writeInputData(finishLastBatch);
+
+            final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            uncommittedKeys.removeAll(newlyCommittedKeys);
+            final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = 
uncommittedInputDataBeforeSecondUpgrade

Review comment:
       @ableegoldman is it related to the UUID randomness? If yes please ignore 
my other question above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to