This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 8bcb05d  KAFKA-10017: fix flaky EOS-beta upgrade test (#9688)
8bcb05d is described below

commit 8bcb05d604e611d2396c4d22cc4573488e9edb6a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Dec 10 17:32:30 2020 -0800

    KAFKA-10017: fix flaky EOS-beta upgrade test (#9688)
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../integration/EosBetaUpgradeIntegrationTest.java | 363 +++++++++++----------
 .../integration/utils/IntegrationTestUtils.java    |   2 +-
 2 files changed, 192 insertions(+), 173 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index d039d98..8129d05 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
-import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -43,10 +42,7 @@ import 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAss
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -83,7 +79,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -105,8 +100,8 @@ public class EosBetaUpgradeIntegrationTest {
     public boolean injectError;
 
     private static final int NUM_BROKERS = 3;
-    private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
-    private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+    private static final int MAX_POLL_INTERVAL_MS = (int) 
Duration.ofSeconds(100L).toMillis();
+    private static final long MAX_WAIT_TIME_MS = 
Duration.ofMinutes(1L).toMillis();
 
     private static final List<KeyValue<KafkaStreams.State, 
KafkaStreams.State>> CLOSE =
         Collections.unmodifiableList(
@@ -152,27 +147,6 @@ public class EosBetaUpgradeIntegrationTest {
     private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
     private final AtomicInteger commitRequested = new AtomicInteger(0);
 
-    // Note: this pattern only works when we just have a single instance 
running with a single thread
-    // If we want to extend the test or reuse this CommitPunctuator we should 
tighten it up
-    private final AtomicBoolean requestCommit = new AtomicBoolean(false);
-    private static class CommitPunctuator implements Punctuator {
-        final ProcessorContext context;
-        final AtomicBoolean requestCommit;
-
-        public CommitPunctuator(final ProcessorContext context, final 
AtomicBoolean requestCommit) {
-            this.context = context;
-            this.requestCommit = requestCommit;
-        }
-
-        @Override
-        public void punctuate(final long timestamp) {
-            if (requestCommit.get()) {
-                context.commit();
-                requestCommit.set(false);
-            }
-        }
-    }
-
     private Throwable uncaughtException;
 
     private int testNumber = 0;
@@ -319,16 +293,18 @@ public class EosBetaUpgradeIntegrationTest {
             //   p-2: 10 rec + C ---> 5 rec (pending)
             //   p-3: 10 rec + C ---> 5 rec (pending)
             // crash case: (we just assumes that we inject the error for p-0; 
in reality it might be a different partition)
+            //             (we don't crash right away and write one record 
less)
             //   p-0: 10 rec + C ---> 4 rec (pending)
             //   p-1: 10 rec + C ---> 5 rec (pending)
             //   p-2: 10 rec + C ---> 5 rec (pending)
             //   p-3: 10 rec + C ---> 5 rec (pending)
             final Set<Long> cleanKeys = mkSet(0L, 1L, 2L, 3L);
-            final Set<Long> keyFilterFirstClient = 
keysFromInstance(streams1Alpha);
-            final long potentiallyFirstFailingKey = 
keyFilterFirstClient.iterator().next();
-            cleanKeys.remove(potentiallyFirstFailingKey);
+            final Set<Long> keysFirstClientAlpha = 
keysFromInstance(streams1Alpha);
+            final long firstFailingKeyForCrashCase = 
keysFirstClientAlpha.iterator().next();
+            cleanKeys.remove(firstFailingKeyForCrashCase);
 
             final List<KeyValue<Long, Long>> 
uncommittedInputDataBeforeFirstUpgrade = new LinkedList<>();
+            final HashMap<Long, Long> uncommittedState = new 
HashMap<>(committedState);
             if (!injectError) {
                 uncommittedInputDataBeforeFirstUpgrade.addAll(
                     prepareData(10L, 15L, 0L, 1L, 2L, 3L)
@@ -336,7 +312,7 @@ public class EosBetaUpgradeIntegrationTest {
                 writeInputData(uncommittedInputDataBeforeFirstUpgrade);
 
                 expectedUncommittedResult.addAll(
-                    
computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade, new 
HashMap<>(committedState))
+                    
computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade, uncommittedState)
                 );
                 verifyUncommitted(expectedUncommittedResult);
             } else {
@@ -345,7 +321,7 @@ public class EosBetaUpgradeIntegrationTest {
                     
uncommittedInputDataWithoutFailingKey.addAll(prepareData(10L, 15L, key));
                 }
                 uncommittedInputDataWithoutFailingKey.addAll(
-                    prepareData(10L, 14L, potentiallyFirstFailingKey)
+                    prepareData(10L, 14L, firstFailingKeyForCrashCase)
                 );
                 
uncommittedInputDataBeforeFirstUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
                 writeInputData(uncommittedInputDataWithoutFailingKey);
@@ -356,19 +332,20 @@ public class EosBetaUpgradeIntegrationTest {
                 verifyUncommitted(expectedUncommittedResult);
             }
 
-            // phase 4: (stop/crash first client)
+            // phase 4: (stop first client)
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec ---> C
-            //   p-1: 10 rec + C + 5 rec ---> C
-            //   p-2: 10 rec + C + 5 rec (pending)
-            //   p-3: 10 rec + C + 5 rec (pending)
-            // crash case:
-            //   p-0: 10 rec + C + 4 rec ---> A + 5 rec (pending)
-            //   p-1: 10 rec + C + 5 rec ---> A + 5 rec (pending)
-            //   p-2: 10 rec + C + 5 rec (pending)
-            //   p-3: 10 rec + C + 5 rec (pending)
+            // stop case: (client 1 will commit its two tasks on close())
+            //   p-0: 10 rec + C   +   5 rec ---> C
+            //   p-1: 10 rec + C   +   5 rec ---> C
+            //   p-2: 10 rec + C   +   5 rec (pending)
+            //   p-3: 10 rec + C   +   5 rec (pending)
+            // crash case: (we write the last record that will trigger the 
crash; both TX from client 1 will be aborted
+            //              during fail over by client 2 and retried)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec (pending)
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec (pending)
+            //   p-2: 10 rec + C   +   5 rec (pending)
+            //   p-3: 10 rec + C   +   5 rec (pending)
             stateTransitions2.clear();
             assignmentListener.prepareForRebalance();
 
@@ -380,7 +357,7 @@ public class EosBetaUpgradeIntegrationTest {
                 errorInjectedClient1.set(true);
 
                 final List<KeyValue<Long, Long>> 
dataPotentiallyFirstFailingKey =
-                    prepareData(14L, 15L, potentiallyFirstFailingKey);
+                    prepareData(14L, 15L, firstFailingKeyForCrashCase);
                 
uncommittedInputDataBeforeFirstUpgrade.addAll(dataPotentiallyFirstFailingKey);
                 writeInputData(dataPotentiallyFirstFailingKey);
             }
@@ -391,7 +368,7 @@ public class EosBetaUpgradeIntegrationTest {
                 final List<KeyValue<Long, Long>> 
committedInputDataDuringFirstUpgrade =
                     uncommittedInputDataBeforeFirstUpgrade
                         .stream()
-                        .filter(pair -> 
keyFilterFirstClient.contains(pair.key))
+                        .filter(pair -> 
keysFirstClientAlpha.contains(pair.key))
                         .collect(Collectors.toList());
 
                 final List<KeyValue<Long, Long>> expectedCommittedResult =
@@ -402,7 +379,7 @@ public class EosBetaUpgradeIntegrationTest {
                 expectedUncommittedResult.addAll(computeExpectedResult(
                     uncommittedInputDataBeforeFirstUpgrade
                         .stream()
-                        .filter(pair -> 
keyFilterFirstClient.contains(pair.key))
+                        .filter(pair -> 
keysFirstClientAlpha.contains(pair.key))
                         .collect(Collectors.toList()),
                     new HashMap<>(committedState)
                 ));
@@ -417,18 +394,20 @@ public class EosBetaUpgradeIntegrationTest {
             // phase 5: (restart first client)
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C
-            //   p-2: 10 rec + C + 5 rec ---> C
-            //   p-3: 10 rec + C + 5 rec ---> C
-            // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec ---> C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec ---> C
-            //   p-2: 10 rec + C + 5 rec ---> C
-            //   p-3: 10 rec + C + 5 rec ---> C
-            requestCommit.set(true);
-            waitForCondition(() -> !requestCommit.get(), "Punctuator did not 
request commit for running client");
+            // stop case: (client 2 (alpha) will commit the two revoked task 
that migrate back to client 1)
+            //            (note: we may or may not get newly committed data, 
depending if the already committed tasks
+            //             migrate back to client 1, or different tasks)
+            //            (below we show the case for which we don't get newly 
committed data)
+            //   p-0: 10 rec + C   +   5 rec ---> C
+            //   p-1: 10 rec + C   +   5 rec ---> C
+            //   p-2: 10 rec + C   +   5 rec (pending)
+            //   p-3: 10 rec + C   +   5 rec (pending)
+            // crash case: (client 2 (alpha) will commit all tasks even only 
two tasks are revoked and migrate back to client 1)
+            //             (note: because nothing was committed originally, we 
always get newly committed data)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec ---> C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec ---> C
+            //   p-2: 10 rec + C   +   5 rec ---> C
+            //   p-3: 10 rec + C   +   5 rec ---> C
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
@@ -440,15 +419,18 @@ public class EosBetaUpgradeIntegrationTest {
             waitForRunning(stateTransitions1);
             waitForRunning(stateTransitions2);
 
-            final Set<Long> committedKeys = mkSet(0L, 1L, 2L, 3L);
+            final Set<Long> newlyCommittedKeys;
             if (!injectError) {
-                committedKeys.removeAll(keyFilterFirstClient);
+                newlyCommittedKeys = keysFromInstance(streams1Beta);
+                newlyCommittedKeys.removeAll(keysFirstClientAlpha);
+            } else {
+                newlyCommittedKeys = mkSet(0L, 1L, 2L, 3L);
             }
 
             final List<KeyValue<Long, Long>> 
expectedCommittedResultAfterRestartFirstClient = computeExpectedResult(
                 uncommittedInputDataBeforeFirstUpgrade
                     .stream()
-                    .filter(pair -> committedKeys.contains(pair.key))
+                    .filter(pair -> newlyCommittedKeys.contains(pair.key))
                     .collect(Collectors.toList()),
                 committedState
             );
@@ -457,33 +439,46 @@ public class EosBetaUpgradeIntegrationTest {
             // phase 6: (complete second batch of data; crash: let second 
client fail on commit)
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + C
-            // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
+            // stop case: (both client commit regularly)
+            //            (depending on the task movement in phase 5, we may 
or may not get newly committed data;
+            //             we show the case for which p-2 and p-3 are newly 
committed below)
+            //   p-0: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec     ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec     ---> 5 rec + C
+            // crash case: (second/alpha client fails and both TX are aborted)
+            //             (first/beta client reprocessed the 10 records and 
commits TX)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 
5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C             ---> 5 rec + A + 
5 rec + C
             commitCounterClient1.set(0);
 
             if (!injectError) {
-                final List<KeyValue<Long, Long>> 
committedInputDataDuringUpgrade =
-                    prepareData(15L, 20L, 0L, 1L, 2L, 3L);
-                writeInputData(committedInputDataDuringUpgrade);
+                final List<KeyValue<Long, Long>> finishSecondBatch = 
prepareData(15L, 20L, 0L, 1L, 2L, 3L);
+                writeInputData(finishSecondBatch);
+
+                final List<KeyValue<Long, Long>> 
committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade
+                    .stream()
+                    .filter(pair -> !keysFirstClientAlpha.contains(pair.key))
+                    .filter(pair -> !newlyCommittedKeys.contains(pair.key))
+                    .collect(Collectors.toList());
+                committedInputDataDuringUpgrade.addAll(
+                    finishSecondBatch
+                );
 
+                expectedUncommittedResult.addAll(
+                    computeExpectedResult(finishSecondBatch, uncommittedState)
+                );
                 final List<KeyValue<Long, Long>> expectedCommittedResult =
                     computeExpectedResult(committedInputDataDuringUpgrade, 
committedState);
                 verifyCommitted(expectedCommittedResult);
-                expectedUncommittedResult.addAll(expectedCommittedResult);
             } else {
-                final Set<Long> keysFirstClient = 
keysFromInstance(streams1Beta);
-                final Set<Long> keysSecondClient = 
keysFromInstance(streams2Alpha);
+                final Set<Long> keysFirstClientBeta = 
keysFromInstance(streams1Beta);
+                final Set<Long> keysSecondClientAlpha = 
keysFromInstance(streams2Alpha);
 
                 final List<KeyValue<Long, Long>> 
committedInputDataAfterFirstUpgrade =
-                    prepareData(15L, 20L, keysFirstClient.toArray(new 
Long[0]));
+                    prepareData(15L, 20L, keysFirstClientBeta.toArray(new 
Long[0]));
                 writeInputData(committedInputDataAfterFirstUpgrade);
 
                 final List<KeyValue<Long, Long>> 
expectedCommittedResultBeforeFailure =
@@ -493,16 +488,16 @@ public class EosBetaUpgradeIntegrationTest {
 
                 commitCounterClient2.set(0);
 
-                final Iterator<Long> it = keysSecondClient.iterator();
+                final Iterator<Long> it = keysSecondClientAlpha.iterator();
                 final Long otherKey = it.next();
                 final Long failingKey = it.next();
 
                 final List<KeyValue<Long, Long>> 
uncommittedInputDataAfterFirstUpgrade =
-                    prepareData(15L, 19L, keysSecondClient.toArray(new 
Long[0]));
+                    prepareData(15L, 19L, keysSecondClientAlpha.toArray(new 
Long[0]));
                 uncommittedInputDataAfterFirstUpgrade.addAll(prepareData(19L, 
20L, otherKey));
                 writeInputData(uncommittedInputDataAfterFirstUpgrade);
 
-                final Map<Long, Long> uncommittedState = new 
HashMap<>(committedState);
+                uncommittedState.putAll(committedState);
                 expectedUncommittedResult.addAll(
                     
computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, uncommittedState)
                 );
@@ -540,7 +535,7 @@ public class EosBetaUpgradeIntegrationTest {
             }
 
             // 7. only for crash case:
-            //     7a. restart the second client in eos-alpha mode and wait 
until rebalance stabilizes
+            //     7a. restart the failed second client in eos-alpha mode and 
wait until rebalance stabilizes
             //     7b. write third batch of input data
             //         * fail the first (i.e., eos-beta) client during commit
             //         * the eos-alpha client should not pickup the pending 
offsets
@@ -548,10 +543,10 @@ public class EosBetaUpgradeIntegrationTest {
             //     7c. restart the first client in eos-beta mode and wait 
until rebalance stabilizes
             //
             // crash case:
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10 
rec + A + 10 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C ---> 10 
rec + A + 10 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 
rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 
rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C ---> 
10 rec + A + 10 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C ---> 
10 rec + A + 10 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C ---> 
10 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C ---> 
10 rec + C
             if (!injectError) {
                 streams2AlphaTwo = streams2Alpha;
             } else {
@@ -571,11 +566,11 @@ public class EosBetaUpgradeIntegrationTest {
                 waitForRunning(stateTransitions2);
 
                 // 7b. write third batch of input data
-                final Set<Long> keysFirstClient = 
keysFromInstance(streams1Beta);
-                final Set<Long> keysSecondClient = 
keysFromInstance(streams2AlphaTwo);
+                final Set<Long> keysFirstClientBeta = 
keysFromInstance(streams1Beta);
+                final Set<Long> keysSecondClientAlphaTwo = 
keysFromInstance(streams2AlphaTwo);
 
                 final List<KeyValue<Long, Long>> 
committedInputDataBetweenUpgrades =
-                    prepareData(20L, 30L, keysSecondClient.toArray(new 
Long[0]));
+                    prepareData(20L, 30L, keysSecondClientAlphaTwo.toArray(new 
Long[0]));
                 writeInputData(committedInputDataBetweenUpgrades);
 
                 final List<KeyValue<Long, Long>> 
expectedCommittedResultBeforeFailure =
@@ -585,16 +580,16 @@ public class EosBetaUpgradeIntegrationTest {
 
                 commitCounterClient2.set(0);
 
-                final Iterator<Long> it = keysFirstClient.iterator();
+                final Iterator<Long> it = keysFirstClientBeta.iterator();
                 final Long otherKey = it.next();
                 final Long failingKey = it.next();
 
                 final List<KeyValue<Long, Long>> 
uncommittedInputDataBetweenUpgrade =
-                    prepareData(20L, 29L, keysFirstClient.toArray(new 
Long[0]));
+                    prepareData(20L, 29L, keysFirstClientBeta.toArray(new 
Long[0]));
                 uncommittedInputDataBetweenUpgrade.addAll(prepareData(29L, 
30L, otherKey));
                 writeInputData(uncommittedInputDataBetweenUpgrade);
 
-                final Map<Long, Long> uncommittedState = new 
HashMap<>(committedState);
+                uncommittedState.putAll(committedState);
                 expectedUncommittedResult.addAll(
                     computeExpectedResult(uncommittedInputDataBetweenUpgrade, 
uncommittedState)
                 );
@@ -640,23 +635,24 @@ public class EosBetaUpgradeIntegrationTest {
                 waitForRunning(stateTransitions2);
             }
 
-            // phase 8: (write partial fourth batch of data)
+            // phase 8: (write partial last batch of data)
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
-            // crash case:  (we just assumes that we inject the error for p-2; 
in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C ---> 5 rec (pending)
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C ---> 5 rec (pending)
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C ---> 4 rec (pending)
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C ---> 5 rec (pending)
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C ---> 5 rec 
(pending)
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C ---> 5 rec 
(pending)
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C ---> 5 rec 
(pending)
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C ---> 5 rec 
(pending)
+            // crash case: (we just assumes that we inject the error for p-2; 
in reality it might be a different partition)
+            //             (we don't crash right away and write one record 
less)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C ---> 5 rec (pending)
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C ---> 5 rec (pending)
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C              ---> 4 rec (pending)
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C              ---> 5 rec (pending)
             cleanKeys.addAll(mkSet(0L, 1L, 2L, 3L));
-            final Set<Long> keyFilterSecondClient = 
keysFromInstance(streams2AlphaTwo);
-            final long potentiallySecondFailingKey = 
keyFilterSecondClient.iterator().next();
-            cleanKeys.remove(potentiallySecondFailingKey);
+            final Set<Long> keysSecondClientAlphaTwo = 
keysFromInstance(streams2AlphaTwo);
+            final long secondFailingKeyForCrashCase = 
keysSecondClientAlphaTwo.iterator().next();
+            cleanKeys.remove(secondFailingKeyForCrashCase);
 
             final List<KeyValue<Long, Long>> 
uncommittedInputDataBeforeSecondUpgrade = new LinkedList<>();
             if (!injectError) {
@@ -675,7 +671,7 @@ public class EosBetaUpgradeIntegrationTest {
                     
uncommittedInputDataWithoutFailingKey.addAll(prepareData(30L, 35L, key));
                 }
                 uncommittedInputDataWithoutFailingKey.addAll(
-                    prepareData(30L, 34L, potentiallySecondFailingKey)
+                    prepareData(30L, 34L, secondFailingKeyForCrashCase)
                 );
                 
uncommittedInputDataBeforeSecondUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
                 writeInputData(uncommittedInputDataWithoutFailingKey);
@@ -689,16 +685,17 @@ public class EosBetaUpgradeIntegrationTest {
             // phase 9: (stop/crash second client)
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec (pending)
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec (pending)
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
-            // crash case:  (we just assumes that we inject the error for p-2; 
in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec (pending)
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec (pending)
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 4 rec ---> A + 5 rec (pending)
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 5 rec ---> A + 5 rec (pending)
+            // stop case: (client 2 (alpha) will commit its two tasks on 
close())
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec 
(pending)
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec 
(pending)
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec ---> C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec ---> C
+            // crash case: (we write the last record that will trigger the 
crash; both TX from client 2 will be aborted
+            //              during fail over by client 1 and retried)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec (pending)
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec (pending)
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   4 rec ---> A + 5 rec (pending)
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   5 rec ---> A + 5 rec (pending)
             stateTransitions1.clear();
             assignmentListener.prepareForRebalance();
             if (!injectError) {
@@ -709,7 +706,7 @@ public class EosBetaUpgradeIntegrationTest {
                 errorInjectedClient2.set(true);
 
                 final List<KeyValue<Long, Long>> 
dataPotentiallySecondFailingKey =
-                    prepareData(34L, 35L, potentiallySecondFailingKey);
+                    prepareData(34L, 35L, secondFailingKeyForCrashCase);
                 
uncommittedInputDataBeforeSecondUpgrade.addAll(dataPotentiallySecondFailingKey);
                 writeInputData(dataPotentiallySecondFailingKey);
             }
@@ -720,7 +717,7 @@ public class EosBetaUpgradeIntegrationTest {
                 final List<KeyValue<Long, Long>> 
committedInputDataDuringSecondUpgrade =
                     uncommittedInputDataBeforeSecondUpgrade
                         .stream()
-                        .filter(pair -> 
keyFilterSecondClient.contains(pair.key))
+                        .filter(pair -> 
keysSecondClientAlphaTwo.contains(pair.key))
                         .collect(Collectors.toList());
 
                 final List<KeyValue<Long, Long>> expectedCommittedResult =
@@ -731,7 +728,7 @@ public class EosBetaUpgradeIntegrationTest {
                 expectedUncommittedResult.addAll(computeExpectedResult(
                     uncommittedInputDataBeforeSecondUpgrade
                         .stream()
-                        .filter(pair -> 
keyFilterSecondClient.contains(pair.key))
+                        .filter(pair -> 
keysSecondClientAlphaTwo.contains(pair.key))
                         .collect(Collectors.toList()),
                     new HashMap<>(committedState)
                 ));
@@ -749,18 +746,16 @@ public class EosBetaUpgradeIntegrationTest {
             // the state below indicate the case for which the "original" 
tasks of client2 are migrated back to client2
             // if a task "switch" happens, we might get additional commits 
(omitted in the comment for brevity)
             //
-            // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
-            // crash case:  (we just assumes that we inject the error for p-2; 
in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec ---> C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec ---> C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 4 rec + A + 5 rec ---> C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 5 rec + A + 5 rec ---> C
-            requestCommit.set(true);
-            waitForCondition(() -> !requestCommit.get(), "Punctuator did not 
request commit for running client");
+            // stop case: (client 1 (beta) will commit all four tasks if at 
least one revoked and migrate task needs committing back to client 2)
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec ---> C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec ---> C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C
+            // crash case: (client 1 (beta) will commit all four tasks even 
only two are migrate back to client 2)
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec ---> C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec ---> C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   4 rec + A + 5 rec ---> C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   5 rec + A + 5 rec ---> C
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
@@ -774,15 +769,18 @@ public class EosBetaUpgradeIntegrationTest {
             waitForRunning(stateTransitions1);
             waitForRunning(stateTransitions2);
 
-            committedKeys.addAll(mkSet(0L, 1L, 2L, 3L));
+            newlyCommittedKeys.clear();
             if (!injectError) {
-                committedKeys.removeAll(keyFilterSecondClient);
+                newlyCommittedKeys.addAll(keysFromInstance(streams2Beta));
+                newlyCommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            } else {
+                newlyCommittedKeys.addAll(mkSet(0L, 1L, 2L, 3L));
             }
 
             final List<KeyValue<Long, Long>> 
expectedCommittedResultAfterRestartSecondClient = computeExpectedResult(
                 uncommittedInputDataBeforeSecondUpgrade
                     .stream()
-                    .filter(pair -> committedKeys.contains(pair.key))
+                    .filter(pair -> newlyCommittedKeys.contains(pair.key))
                     .collect(Collectors.toList()),
                 committedState
             );
@@ -792,24 +790,35 @@ public class EosBetaUpgradeIntegrationTest {
             // expected end state per output partition (C == COMMIT; A == 
ABORT; ---> indicate the changes):
             //
             // stop case:
-            //   p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 
rec + C
+            //   p-0: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + C   +   5 rec + C 
---> 5 rec + C
             // crash case:  (we just assumes that we inject the error for p-2; 
in reality it might be a different partition)
-            //   p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec 
+ A + 10 rec + C + 5 rec + C ---> 5 rec + C
-            //   p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 4 rec + A + 5 rec + C ---> 5 rec + C
-            //   p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec 
+ C + 5 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-0: 10 rec + C   +   4 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-1: 10 rec + C   +   5 rec + A + 5 rec + C + 5 rec + C   +   
10 rec + A + 10 rec + C   +   5 rec + C             ---> 5 rec + C
+            //   p-2: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   4 rec + A + 5 rec + C ---> 5 rec + C
+            //   p-3: 10 rec + C   +   5 rec + C + 5 rec + A + 5 rec + C   +   
10 rec + C                +   5 rec + A + 5 rec + C ---> 5 rec + C
             commitCounterClient1.set(-1);
             commitCounterClient2.set(-1);
 
-            final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+            final List<KeyValue<Long, Long>> finishLastBatch =
                 prepareData(35L, 40L, 0L, 1L, 2L, 3L);
-            writeInputData(committedInputDataAfterUpgrade);
+            writeInputData(finishLastBatch);
+
+            final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+            uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+            uncommittedKeys.removeAll(newlyCommittedKeys);
+            final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = 
uncommittedInputDataBeforeSecondUpgrade
+                .stream()
+                .filter(pair -> uncommittedKeys.contains(pair.key))
+                .collect(Collectors.toList());
+            committedInputDataDuringUpgrade.addAll(
+                finishLastBatch
+            );
 
             final List<KeyValue<Long, Long>> expectedCommittedResult =
-                computeExpectedResult(committedInputDataAfterUpgrade, 
committedState);
+                computeExpectedResult(committedInputDataDuringUpgrade, 
committedState);
             verifyCommitted(expectedCommittedResult);
         } finally {
             if (streams1Alpha != null) {
@@ -854,7 +863,6 @@ public class EosBetaUpgradeIntegrationTest {
                     KeyValueStore<Long, Long> state = null;
                     AtomicBoolean crash;
                     AtomicInteger sharedCommit;
-                    Cancellable punctuator;
 
                     @Override
                     public void init(final ProcessorContext context) {
@@ -868,11 +876,6 @@ public class EosBetaUpgradeIntegrationTest {
                             crash = errorInjectedClient2;
                             sharedCommit = commitCounterClient2;
                         }
-                        punctuator = context.schedule(
-                            Duration.ofMillis(100),
-                            PunctuationType.WALL_CLOCK_TIME,
-                            new CommitPunctuator(context, requestCommit)
-                        );
                     }
 
                     @Override
@@ -905,9 +908,7 @@ public class EosBetaUpgradeIntegrationTest {
                     }
 
                     @Override
-                    public void close() {
-                        punctuator.cancel();
-                    }
+                    public void close() {}
                 };
             } }, storeNames)
             .to(MULTI_PARTITION_OUTPUT_TOPIC);
@@ -916,11 +917,12 @@ public class EosBetaUpgradeIntegrationTest {
         properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
processingGuarantee);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Long.MAX_VALUE);
-        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
 "1000");
+        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
 Duration.ofSeconds(1L).toMillis());
         
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "earliest");
-        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
 5 * 1000);
-        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 5 * 1000 - 1);
+        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
 (int) Duration.ofSeconds(5L).toMillis());
+        
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
         
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
 MAX_POLL_INTERVAL_MS);
+        
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
 (int) Duration.ofMinutes(1L).toMillis());
         
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG),
 KeyPartitioner.class);
         properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath() + File.separator + appDir);
@@ -1020,7 +1022,8 @@ public class EosBetaUpgradeIntegrationTest {
                     )
                 ),
                 MULTI_PARTITION_OUTPUT_TOPIC,
-                numberOfRecords
+                numberOfRecords,
+                MAX_WAIT_TIME_MS
             );
         }
 
@@ -1039,7 +1042,15 @@ public class EosBetaUpgradeIntegrationTest {
         addAllKeys(allKeys, expectedResult);
 
         for (final Long key : allKeys) {
-            assertThat(getAllRecordPerKey(key, result), 
equalTo(getAllRecordPerKey(key, expectedResult)));
+            try {
+                assertThat(getAllRecordPerKey(key, result), 
equalTo(getAllRecordPerKey(key, expectedResult)));
+            } catch (final AssertionError error) {
+                throw new AssertionError(
+                    "expected result: " + 
expectedResult.stream().map(KeyValue::toString).reduce("", (kv, str) -> 
str.isEmpty() ? kv : str + ", " + kv) +
+                    "\nreceived records: " + 
result.stream().map(KeyValue::toString).reduce("", (kv, str) -> str.isEmpty() ? 
kv : str + ", " + kv),
+                    error
+                );
+            }
         }
     }
 
@@ -1075,19 +1086,27 @@ public class EosBetaUpgradeIntegrationTest {
     }
 
     private Set<Long> keysFromInstance(final KafkaStreams streams) throws 
Exception {
-        final ReadOnlyKeyValueStore<Long, Long> store = getStore(
+        final Set<Long> keys = new HashSet<>();
+        waitForCondition(
+            () -> {
+                final ReadOnlyKeyValueStore<Long, Long> store = streams.store(
+                    StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore())
+                );
+
+                keys.clear();
+                try (final KeyValueIterator<Long, Long> it = store.all()) {
+                    while (it.hasNext()) {
+                        final KeyValue<Long, Long> row = it.next();
+                        keys.add(row.key);
+                    }
+                }
+
+                return true;
+            },
             MAX_WAIT_TIME_MS,
-            streams,
-            StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore())
+            "Could not get keys from store: " + storeName
         );
-        waitForCondition(() -> store.get(-1L) == null, MAX_WAIT_TIME_MS, () -> 
"State store did not ready: " + storeName);
-        final Set<Long> keys = new HashSet<>();
-        try (final KeyValueIterator<Long, Long> it = store.all()) {
-            while (it.hasNext()) {
-                final KeyValue<Long, Long> row = it.next();
-                keys.add(row.key);
-            }
-        }
+
         return keys;
     }
 
@@ -1133,7 +1152,7 @@ public class EosBetaUpgradeIntegrationTest {
         }
 
         @Override
-        public void commitTransaction() throws ProducerFencedException {
+        public void commitTransaction() {
             super.flush(); // we flush to ensure that the offsets are written
             if (!crash.compareAndSet(true, false)) {
                 super.commitTransaction();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 03b287d..3de6a7d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1158,7 +1158,7 @@ public class IntegrationTestUtils {
     }
 
     private static boolean continueConsuming(final int messagesConsumed, final 
int maxMessages) {
-        return maxMessages <= 0 || messagesConsumed < maxMessages;
+        return maxMessages > 0 && messagesConsumed < maxMessages;
     }
 
     /**

Reply via email to