ableegoldman commented on a change in pull request #9688:
URL: https://github.com/apache/kafka/pull/9688#discussion_r536321595



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -916,11 +917,12 @@ public void close() {
         properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
processingGuarantee);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Long.MAX_VALUE);
-        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
 "1000");
+        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
 Duration.ofSeconds(1L).toMillis());
         
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "earliest");
-        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
 5 * 1000);
-        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 5 * 1000 - 1);
+        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
 (int) Duration.ofSeconds(5L).toMillis());
+        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
         
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
 MAX_POLL_INTERVAL_MS);
+        
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
 (int) Duration.ofMinutes(5L).toMillis());

Review comment:
       5 minutes seems kind of long, the whole test should take only a few 
minutes and it has 11 phases.  Would 1 minute be more reasonable? Or do we 
actually need this timeout to cover more than one or two phases?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
             // crash case:  (we just assumes that we inject the error for p-2; 
in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   5 rec + A + 5 rec + C ---> 5 rec + C
             commitCounterClient1.set(-1);
             commitCounterClient2.set(-1);
 
-            final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+            final List<KeyValue<Long, Long>> finishLastBatch =
                 prepareData(35L, 40L, 0L, 1L, 2L, 3L);
-            writeInputData(committedInputDataAfterUpgrade);
+            writeInputData(finishLastBatch);
+
+            final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            uncommittedKeys.removeAll(newlyCommittedKeys);
+            final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = 
uncommittedInputDataBeforeSecondUpgrade

Review comment:
       I'm guessing the root source of this all is a bad assumption that the 
assignment would be stable if a stable `CLIENT_ID` was used? I remember we 
discussed that back when you first wrote this test, I'm sorry for any 
misinformation I supplied based on my own assumption about how the CLIENT_ID 
would be used :/

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -319,24 +293,26 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws 
Exception {
             //   p-2: 10 rec + C ---> 5 rec (pending)
             //   p-3: 10 rec + C ---> 5 rec (pending)
             // crash case: (we just assumes that we inject the error for p-0; 
in reality it might be a different partition)
+            //             (we don't crash right away and write one record 
less)
             //   p-0: 10 rec + C ---> 4 rec (pending)
             //   p-1: 10 rec + C ---> 5 rec (pending)
             //   p-2: 10 rec + C ---> 5 rec (pending)
             //   p-3: 10 rec + C ---> 5 rec (pending)
             final Set<Long> cleanKeys = mkSet(0L, 1L, 2L, 3L);
-            final Set<Long> keyFilterFirstClient = 
keysFromInstance(streams1Alpha);
-            final long potentiallyFirstFailingKey = 
keyFilterFirstClient.iterator().next();
-            cleanKeys.remove(potentiallyFirstFailingKey);
+            final Set<Long> keysFirstClientAlpha = 
keysFromInstance(streams1Alpha);
+            final long firstFailingKeyForCrashCase = 
keysFirstClientAlpha.iterator().next();

Review comment:
       Thanks for cleaning up the variable names 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to