ableegoldman commented on a change in pull request #9688: URL: https://github.com/apache/kafka/pull/9688#discussion_r536321595
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -916,11 +917,12 @@ public void close() { properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE); - properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); + properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis()); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); - properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000); - properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1); + properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis()); + properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis()); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS); + properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(5L).toMillis()); Review comment: 5 minutes seems kind of long, the whole test should take only a few minutes and it has 11 phases. Would 1 minute be more reasonable? Or do we actually need this timeout to cover more than one or two phases? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -792,24 +790,35 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes): // // stop case: - // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C - // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C - // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C - // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C + // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C + // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C + // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C + // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C // crash case: (we just assumes that we inject the error for p-2; in reality it might be a different partition) - // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C - // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C - // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C - // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C + // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C + // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C + // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C + // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C commitCounterClient1.set(-1); commitCounterClient2.set(-1); - final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade = + final List<KeyValue<Long, Long>> finishLastBatch = prepareData(35L, 40L, 0L, 1L, 2L, 3L); - writeInputData(committedInputDataAfterUpgrade); + writeInputData(finishLastBatch); + + final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L); + uncommittedKeys.removeAll(keysSecondClientAlphaTwo); + uncommittedKeys.removeAll(newlyCommittedKeys); + final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade Review comment: I'm guessing the root source of this all is a bad assumption that the assignment would be stable if a stable `CLIENT_ID` was used? I remember we discussed that back when you first wrote this test, I'm sorry for any misinformation I supplied based on my own assumption about how the CLIENT_ID would be used :/ ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java ########## @@ -319,24 +293,26 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { // p-2: 10 rec + C ---> 5 rec (pending) // p-3: 10 rec + C ---> 5 rec (pending) // crash case: (we just assumes that we inject the error for p-0; in reality it might be a different partition) + // (we don't crash right away and write one record less) // p-0: 10 rec + C ---> 4 rec (pending) // p-1: 10 rec + C ---> 5 rec (pending) // p-2: 10 rec + C ---> 5 rec (pending) // p-3: 10 rec + C ---> 5 rec (pending) final Set<Long> cleanKeys = mkSet(0L, 1L, 2L, 3L); - final Set<Long> keyFilterFirstClient = keysFromInstance(streams1Alpha); - final long potentiallyFirstFailingKey = keyFilterFirstClient.iterator().next(); - cleanKeys.remove(potentiallyFirstFailingKey); + final Set<Long> keysFirstClientAlpha = keysFromInstance(streams1Alpha); + final long firstFailingKeyForCrashCase = keysFirstClientAlpha.iterator().next(); Review comment: Thanks for cleaning up the variable names 🙂 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org