This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 8bcb05d KAFKA-10017: fix flaky EOS-beta upgrade test (#9688)
8bcb05d is described below
commit 8bcb05d604e611d2396c4d22cc4573488e9edb6a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Dec 10 17:32:30 2020 -0800
KAFKA-10017: fix flaky EOS-beta upgrade test (#9688)
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang
<[email protected]>
---
.../integration/EosBetaUpgradeIntegrationTest.java | 363 +++++++++++----------
.../integration/utils/IntegrationTestUtils.java | 2 +-
2 files changed, 192 insertions(+), 173 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index d039d98..8129d05 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
-import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -43,10 +42,7 @@ import
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAss
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -83,7 +79,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkSet;
-import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -105,8 +100,8 @@ public class EosBetaUpgradeIntegrationTest {
public boolean injectError;
private static final int NUM_BROKERS = 3;
- private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
- private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+ private static final int MAX_POLL_INTERVAL_MS = (int)
Duration.ofSeconds(100L).toMillis();
+ private static final long MAX_WAIT_TIME_MS =
Duration.ofMinutes(1L).toMillis();
private static final List<KeyValue<KafkaStreams.State,
KafkaStreams.State>> CLOSE =
Collections.unmodifiableList(
@@ -152,27 +147,6 @@ public class EosBetaUpgradeIntegrationTest {
private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
private final AtomicInteger commitRequested = new AtomicInteger(0);
- // Note: this pattern only works when we just have a single instance
running with a single thread
- // If we want to extend the test or reuse this CommitPunctuator we should
tighten it up
- private final AtomicBoolean requestCommit = new AtomicBoolean(false);
- private static class CommitPunctuator implements Punctuator {
- final ProcessorContext context;
- final AtomicBoolean requestCommit;
-
- public CommitPunctuator(final ProcessorContext context, final
AtomicBoolean requestCommit) {
- this.context = context;
- this.requestCommit = requestCommit;
- }
-
- @Override
- public void punctuate(final long timestamp) {
- if (requestCommit.get()) {
- context.commit();
- requestCommit.set(false);
- }
- }
- }
-
private Throwable uncaughtException;
private int testNumber = 0;
@@ -319,16 +293,18 @@ public class EosBetaUpgradeIntegrationTest {
// p-2: 10 rec + C ---> 5 rec (pending)
// p-3: 10 rec + C ---> 5 rec (pending)
// crash case: (we just assumes that we inject the error for p-0;
in reality it might be a different partition)
+ // (we don't crash right away and write one record
less)
// p-0: 10 rec + C ---> 4 rec (pending)
// p-1: 10 rec + C ---> 5 rec (pending)
// p-2: 10 rec + C ---> 5 rec (pending)
// p-3: 10 rec + C ---> 5 rec (pending)
final Set<Long> cleanKeys = mkSet(0L, 1L, 2L, 3L);
- final Set<Long> keyFilterFirstClient =
keysFromInstance(streams1Alpha);
- final long potentiallyFirstFailingKey =
keyFilterFirstClient.iterator().next();
- cleanKeys.remove(potentiallyFirstFailingKey);
+ final Set<Long> keysFirstClientAlpha =
keysFromInstance(streams1Alpha);
+ final long firstFailingKeyForCrashCase =
keysFirstClientAlpha.iterator().next();
+ cleanKeys.remove(firstFailingKeyForCrashCase);
final List<KeyValue<Long, Long>>
uncommittedInputDataBeforeFirstUpgrade = new LinkedList<>();
+ final HashMap<Long, Long> uncommittedState = new
HashMap<>(committedState);
if (!injectError) {
uncommittedInputDataBeforeFirstUpgrade.addAll(
prepareData(10L, 15L, 0L, 1L, 2L, 3L)
@@ -336,7 +312,7 @@ public class EosBetaUpgradeIntegrationTest {
writeInputData(uncommittedInputDataBeforeFirstUpgrade);
expectedUncommittedResult.addAll(
-
computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade, new
HashMap<>(committedState))
+
computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade, uncommittedState)
);
verifyUncommitted(expectedUncommittedResult);
} else {
@@ -345,7 +321,7 @@ public class EosBetaUpgradeIntegrationTest {
uncommittedInputDataWithoutFailingKey.addAll(prepareData(10L, 15L, key));
}
uncommittedInputDataWithoutFailingKey.addAll(
- prepareData(10L, 14L, potentiallyFirstFailingKey)
+ prepareData(10L, 14L, firstFailingKeyForCrashCase)
);
uncommittedInputDataBeforeFirstUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
writeInputData(uncommittedInputDataWithoutFailingKey);
@@ -356,19 +332,20 @@ public class EosBetaUpgradeIntegrationTest {
verifyUncommitted(expectedUncommittedResult);
}
- // phase 4: (stop/crash first client)
+ // phase 4: (stop first client)
// expected end state per output partition (C == COMMIT; A ==
ABORT; ---> indicate the changes):
//
- // stop case:
- // p-0: 10 rec + C + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec (pending)
- // p-3: 10 rec + C + 5 rec (pending)
- // crash case:
- // p-0: 10 rec + C + 4 rec ---> A + 5 rec (pending)
- // p-1: 10 rec + C + 5 rec ---> A + 5 rec (pending)
- // p-2: 10 rec + C + 5 rec (pending)
- // p-3: 10 rec + C + 5 rec (pending)
+ // stop case: (client 1 will commit its two tasks on close())
+ // p-0: 10 rec + C + 5 rec ---> C
+ // p-1: 10 rec + C + 5 rec ---> C
+ // p-2: 10 rec + C + 5 rec (pending)
+ // p-3: 10 rec + C + 5 rec (pending)
+ // crash case: (we write the last record that will trigger the
crash; both TX from client 1 will be aborted
+ // during fail over by client 2 and retried)
+ // p-0: 10 rec + C + 4 rec + A + 5 rec (pending)
+ // p-1: 10 rec + C + 5 rec + A + 5 rec (pending)
+ // p-2: 10 rec + C + 5 rec (pending)
+ // p-3: 10 rec + C + 5 rec (pending)
stateTransitions2.clear();
assignmentListener.prepareForRebalance();
@@ -380,7 +357,7 @@ public class EosBetaUpgradeIntegrationTest {
errorInjectedClient1.set(true);
final List<KeyValue<Long, Long>>
dataPotentiallyFirstFailingKey =
- prepareData(14L, 15L, potentiallyFirstFailingKey);
+ prepareData(14L, 15L, firstFailingKeyForCrashCase);
uncommittedInputDataBeforeFirstUpgrade.addAll(dataPotentiallyFirstFailingKey);
writeInputData(dataPotentiallyFirstFailingKey);
}
@@ -391,7 +368,7 @@ public class EosBetaUpgradeIntegrationTest {
final List<KeyValue<Long, Long>>
committedInputDataDuringFirstUpgrade =
uncommittedInputDataBeforeFirstUpgrade
.stream()
- .filter(pair ->
keyFilterFirstClient.contains(pair.key))
+ .filter(pair ->
keysFirstClientAlpha.contains(pair.key))
.collect(Collectors.toList());
final List<KeyValue<Long, Long>> expectedCommittedResult =
@@ -402,7 +379,7 @@ public class EosBetaUpgradeIntegrationTest {
expectedUncommittedResult.addAll(computeExpectedResult(
uncommittedInputDataBeforeFirstUpgrade
.stream()
- .filter(pair ->
keyFilterFirstClient.contains(pair.key))
+ .filter(pair ->
keysFirstClientAlpha.contains(pair.key))
.collect(Collectors.toList()),
new HashMap<>(committedState)
));
@@ -417,18 +394,20 @@ public class EosBetaUpgradeIntegrationTest {
// phase 5: (restart first client)
// expected end state per output partition (C == COMMIT; A ==
ABORT; ---> indicate the changes):
//
- // stop case:
- // p-0: 10 rec + C + 5 rec + C
- // p-1: 10 rec + C + 5 rec + C
- // p-2: 10 rec + C + 5 rec ---> C
- // p-3: 10 rec + C + 5 rec ---> C
- // crash case:
- // p-0: 10 rec + C + 4 rec + A + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec + A + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec ---> C
- // p-3: 10 rec + C + 5 rec ---> C
- requestCommit.set(true);
- waitForCondition(() -> !requestCommit.get(), "Punctuator did not
request commit for running client");
+ // stop case: (client 2 (alpha) will commit the two revoked task
that migrate back to client 1)
+ // (note: we may or may not get newly committed data,
depending if the already committed tasks
+ // migrate back to client 1, or different tasks)
+ // (below we show the case for which we don't get newly
committed data)
+ // p-0: 10 rec + C + 5 rec ---> C
+ // p-1: 10 rec + C + 5 rec ---> C
+ // p-2: 10 rec + C + 5 rec (pending)
+ // p-3: 10 rec + C + 5 rec (pending)
+ // crash case: (client 2 (alpha) will commit all tasks even only
two tasks are revoked and migrate back to client 1)
+ // (note: because nothing was committed originally, we
always get newly committed data)
+ // p-0: 10 rec + C + 4 rec + A + 5 rec ---> C
+ // p-1: 10 rec + C + 5 rec + A + 5 rec ---> C
+ // p-2: 10 rec + C + 5 rec ---> C
+ // p-3: 10 rec + C + 5 rec ---> C
commitRequested.set(0);
stateTransitions1.clear();
stateTransitions2.clear();
@@ -440,15 +419,18 @@ public class EosBetaUpgradeIntegrationTest {
waitForRunning(stateTransitions1);
waitForRunning(stateTransitions2);
- final Set<Long> committedKeys = mkSet(0L, 1L, 2L, 3L);
+ final Set<Long> newlyCommittedKeys;
if (!injectError) {
- committedKeys.removeAll(keyFilterFirstClient);
+ newlyCommittedKeys = keysFromInstance(streams1Beta);
+ newlyCommittedKeys.removeAll(keysFirstClientAlpha);
+ } else {
+ newlyCommittedKeys = mkSet(0L, 1L, 2L, 3L);
}
final List<KeyValue<Long, Long>>
expectedCommittedResultAfterRestartFirstClient = computeExpectedResult(
uncommittedInputDataBeforeFirstUpgrade
.stream()
- .filter(pair -> committedKeys.contains(pair.key))
+ .filter(pair -> newlyCommittedKeys.contains(pair.key))
.collect(Collectors.toList()),
committedState
);
@@ -457,33 +439,46 @@ public class EosBetaUpgradeIntegrationTest {
// phase 6: (complete second batch of data; crash: let second
client fail on commit)
// expected end state per output partition (C == COMMIT; A ==
ABORT; ---> indicate the changes):
//
- // stop case:
- // p-0: 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-1: 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-2: 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-3: 10 rec + C + 5 rec + C ---> 5 rec + C
- // crash case:
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
- // p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
- // p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
+ // stop case: (both client commit regularly)
+ // (depending on the task movement in phase 5, we may
or may not get newly committed data;
+ // we show the case for which p-2 and p-3 are newly
committed below)
+ // p-0: 10 rec + C + 5 rec + C ---> 5 rec + C
+ // p-1: 10 rec + C + 5 rec + C ---> 5 rec + C
+ // p-2: 10 rec + C + 5 rec ---> 5 rec + C
+ // p-3: 10 rec + C + 5 rec ---> 5 rec + C
+ // crash case: (second/alpha client fails and both TX are aborted)
+ // (first/beta client reprocessed the 10 records and
commits TX)
+ // p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
+ // p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
+ // p-2: 10 rec + C + 5 rec + C ---> 5 rec + A +
5 rec + C
+ // p-3: 10 rec + C + 5 rec + C ---> 5 rec + A +
5 rec + C
commitCounterClient1.set(0);
if (!injectError) {
- final List<KeyValue<Long, Long>>
committedInputDataDuringUpgrade =
- prepareData(15L, 20L, 0L, 1L, 2L, 3L);
- writeInputData(committedInputDataDuringUpgrade);
+ final List<KeyValue<Long, Long>> finishSecondBatch =
prepareData(15L, 20L, 0L, 1L, 2L, 3L);
+ writeInputData(finishSecondBatch);
+
+ final List<KeyValue<Long, Long>>
committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade
+ .stream()
+ .filter(pair -> !keysFirstClientAlpha.contains(pair.key))
+ .filter(pair -> !newlyCommittedKeys.contains(pair.key))
+ .collect(Collectors.toList());
+ committedInputDataDuringUpgrade.addAll(
+ finishSecondBatch
+ );
+ expectedUncommittedResult.addAll(
+ computeExpectedResult(finishSecondBatch, uncommittedState)
+ );
final List<KeyValue<Long, Long>> expectedCommittedResult =
computeExpectedResult(committedInputDataDuringUpgrade,
committedState);
verifyCommitted(expectedCommittedResult);
- expectedUncommittedResult.addAll(expectedCommittedResult);
} else {
- final Set<Long> keysFirstClient =
keysFromInstance(streams1Beta);
- final Set<Long> keysSecondClient =
keysFromInstance(streams2Alpha);
+ final Set<Long> keysFirstClientBeta =
keysFromInstance(streams1Beta);
+ final Set<Long> keysSecondClientAlpha =
keysFromInstance(streams2Alpha);
final List<KeyValue<Long, Long>>
committedInputDataAfterFirstUpgrade =
- prepareData(15L, 20L, keysFirstClient.toArray(new
Long[0]));
+ prepareData(15L, 20L, keysFirstClientBeta.toArray(new
Long[0]));
writeInputData(committedInputDataAfterFirstUpgrade);
final List<KeyValue<Long, Long>>
expectedCommittedResultBeforeFailure =
@@ -493,16 +488,16 @@ public class EosBetaUpgradeIntegrationTest {
commitCounterClient2.set(0);
- final Iterator<Long> it = keysSecondClient.iterator();
+ final Iterator<Long> it = keysSecondClientAlpha.iterator();
final Long otherKey = it.next();
final Long failingKey = it.next();
final List<KeyValue<Long, Long>>
uncommittedInputDataAfterFirstUpgrade =
- prepareData(15L, 19L, keysSecondClient.toArray(new
Long[0]));
+ prepareData(15L, 19L, keysSecondClientAlpha.toArray(new
Long[0]));
uncommittedInputDataAfterFirstUpgrade.addAll(prepareData(19L,
20L, otherKey));
writeInputData(uncommittedInputDataAfterFirstUpgrade);
- final Map<Long, Long> uncommittedState = new
HashMap<>(committedState);
+ uncommittedState.putAll(committedState);
expectedUncommittedResult.addAll(
computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, uncommittedState)
);
@@ -540,7 +535,7 @@ public class EosBetaUpgradeIntegrationTest {
}
// 7. only for crash case:
- // 7a. restart the second client in eos-alpha mode and wait
until rebalance stabilizes
+ // 7a. restart the failed second client in eos-alpha mode and
wait until rebalance stabilizes
// 7b. write third batch of input data
// * fail the first (i.e., eos-beta) client during commit
// * the eos-alpha client should not pickup the pending
offsets
@@ -548,10 +543,10 @@ public class EosBetaUpgradeIntegrationTest {
// 7c. restart the first client in eos-beta mode and wait
until rebalance stabilizes
//
// crash case:
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10
rec + A + 10 rec + C
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C ---> 10
rec + A + 10 rec + C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10
rec + C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10
rec + C
+ // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C --->
10 rec + A + 10 rec + C
+ // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C --->
10 rec + A + 10 rec + C
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C --->
10 rec + C
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C --->
10 rec + C
if (!injectError) {
streams2AlphaTwo = streams2Alpha;
} else {
@@ -571,11 +566,11 @@ public class EosBetaUpgradeIntegrationTest {
waitForRunning(stateTransitions2);
// 7b. write third batch of input data
- final Set<Long> keysFirstClient =
keysFromInstance(streams1Beta);
- final Set<Long> keysSecondClient =
keysFromInstance(streams2AlphaTwo);
+ final Set<Long> keysFirstClientBeta =
keysFromInstance(streams1Beta);
+ final Set<Long> keysSecondClientAlphaTwo =
keysFromInstance(streams2AlphaTwo);
final List<KeyValue<Long, Long>>
committedInputDataBetweenUpgrades =
- prepareData(20L, 30L, keysSecondClient.toArray(new
Long[0]));
+ prepareData(20L, 30L, keysSecondClientAlphaTwo.toArray(new
Long[0]));
writeInputData(committedInputDataBetweenUpgrades);
final List<KeyValue<Long, Long>>
expectedCommittedResultBeforeFailure =
@@ -585,16 +580,16 @@ public class EosBetaUpgradeIntegrationTest {
commitCounterClient2.set(0);
- final Iterator<Long> it = keysFirstClient.iterator();
+ final Iterator<Long> it = keysFirstClientBeta.iterator();
final Long otherKey = it.next();
final Long failingKey = it.next();
final List<KeyValue<Long, Long>>
uncommittedInputDataBetweenUpgrade =
- prepareData(20L, 29L, keysFirstClient.toArray(new
Long[0]));
+ prepareData(20L, 29L, keysFirstClientBeta.toArray(new
Long[0]));
uncommittedInputDataBetweenUpgrade.addAll(prepareData(29L,
30L, otherKey));
writeInputData(uncommittedInputDataBetweenUpgrade);
- final Map<Long, Long> uncommittedState = new
HashMap<>(committedState);
+ uncommittedState.putAll(committedState);
expectedUncommittedResult.addAll(
computeExpectedResult(uncommittedInputDataBetweenUpgrade,
uncommittedState)
);
@@ -640,23 +635,24 @@ public class EosBetaUpgradeIntegrationTest {
waitForRunning(stateTransitions2);
}
- // phase 8: (write partial fourth batch of data)
+ // phase 8: (write partial last batch of data)
// expected end state per output partition (C == COMMIT; A ==
ABORT; ---> indicate the changes):
//
// stop case:
- // p-0: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
- // p-2: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
- // p-3: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
- // crash case: (we just assumes that we inject the error for p-2;
in reality it might be a different partition)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec
+ A + 10 rec + C ---> 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec
+ A + 10 rec + C ---> 5 rec (pending)
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec
+ C ---> 4 rec (pending)
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec
+ C ---> 5 rec (pending)
+ // p-0: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec
(pending)
+ // p-1: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec
(pending)
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec
(pending)
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec
(pending)
+ // crash case: (we just assumes that we inject the error for p-2;
in reality it might be a different partition)
+ // (we don't crash right away and write one record
less)
+ // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C +
10 rec + A + 10 rec + C ---> 5 rec (pending)
+ // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C +
10 rec + A + 10 rec + C ---> 5 rec (pending)
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C +
10 rec + C ---> 4 rec (pending)
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C +
10 rec + C ---> 5 rec (pending)
cleanKeys.addAll(mkSet(0L, 1L, 2L, 3L));
- final Set<Long> keyFilterSecondClient =
keysFromInstance(streams2AlphaTwo);
- final long potentiallySecondFailingKey =
keyFilterSecondClient.iterator().next();
- cleanKeys.remove(potentiallySecondFailingKey);
+ final Set<Long> keysSecondClientAlphaTwo =
keysFromInstance(streams2AlphaTwo);
+ final long secondFailingKeyForCrashCase =
keysSecondClientAlphaTwo.iterator().next();
+ cleanKeys.remove(secondFailingKeyForCrashCase);
final List<KeyValue<Long, Long>>
uncommittedInputDataBeforeSecondUpgrade = new LinkedList<>();
if (!injectError) {
@@ -675,7 +671,7 @@ public class EosBetaUpgradeIntegrationTest {
uncommittedInputDataWithoutFailingKey.addAll(prepareData(30L, 35L, key));
}
uncommittedInputDataWithoutFailingKey.addAll(
- prepareData(30L, 34L, potentiallySecondFailingKey)
+ prepareData(30L, 34L, secondFailingKeyForCrashCase)
);
uncommittedInputDataBeforeSecondUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
writeInputData(uncommittedInputDataWithoutFailingKey);
@@ -689,16 +685,17 @@ public class EosBetaUpgradeIntegrationTest {
// phase 9: (stop/crash second client)
// expected end state per output partition (C == COMMIT; A ==
ABORT; ---> indicate the changes):
//
- // stop case:
- // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec (pending)
- // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
- // crash case: (we just assumes that we inject the error for p-2;
in reality it might be a different partition)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec
+ A + 10 rec + C + 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec
+ A + 10 rec + C + 5 rec (pending)
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec
+ C + 4 rec ---> A + 5 rec (pending)
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec
+ C + 5 rec ---> A + 5 rec (pending)
+ // stop case: (client 2 (alpha) will commit its two tasks on
close())
+ // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec
(pending)
+ // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec
(pending)
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
+ // crash case: (we write the last record that will trigger the
crash; both TX from client 2 will be aborted
+ // during fail over by client 1 and retried)
+ // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C +
10 rec + A + 10 rec + C + 5 rec (pending)
+ // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C +
10 rec + A + 10 rec + C + 5 rec (pending)
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C +
10 rec + C + 4 rec ---> A + 5 rec (pending)
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C +
10 rec + C + 5 rec ---> A + 5 rec (pending)
stateTransitions1.clear();
assignmentListener.prepareForRebalance();
if (!injectError) {
@@ -709,7 +706,7 @@ public class EosBetaUpgradeIntegrationTest {
errorInjectedClient2.set(true);
final List<KeyValue<Long, Long>>
dataPotentiallySecondFailingKey =
- prepareData(34L, 35L, potentiallySecondFailingKey);
+ prepareData(34L, 35L, secondFailingKeyForCrashCase);
uncommittedInputDataBeforeSecondUpgrade.addAll(dataPotentiallySecondFailingKey);
writeInputData(dataPotentiallySecondFailingKey);
}
@@ -720,7 +717,7 @@ public class EosBetaUpgradeIntegrationTest {
final List<KeyValue<Long, Long>>
committedInputDataDuringSecondUpgrade =
uncommittedInputDataBeforeSecondUpgrade
.stream()
- .filter(pair ->
keyFilterSecondClient.contains(pair.key))
+ .filter(pair ->
keysSecondClientAlphaTwo.contains(pair.key))
.collect(Collectors.toList());
final List<KeyValue<Long, Long>> expectedCommittedResult =
@@ -731,7 +728,7 @@ public class EosBetaUpgradeIntegrationTest {
expectedUncommittedResult.addAll(computeExpectedResult(
uncommittedInputDataBeforeSecondUpgrade
.stream()
- .filter(pair ->
keyFilterSecondClient.contains(pair.key))
+ .filter(pair ->
keysSecondClientAlphaTwo.contains(pair.key))
.collect(Collectors.toList()),
new HashMap<>(committedState)
));
@@ -749,18 +746,16 @@ public class EosBetaUpgradeIntegrationTest {
// the state below indicate the case for which the "original"
tasks of client2 are migrated back to client2
// if a task "switch" happens, we might get additional commits
(omitted in the comment for brevity)
//
- // stop case:
- // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
- // crash case: (we just assumes that we inject the error for p-2;
in reality it might be a different partition)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec
+ A + 10 rec + C + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec
+ A + 10 rec + C + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec
+ C + 4 rec + A + 5 rec ---> C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec
+ C + 5 rec + A + 5 rec ---> C
- requestCommit.set(true);
- waitForCondition(() -> !requestCommit.get(), "Punctuator did not
request commit for running client");
+ // stop case: (client 1 (beta) will commit all four tasks if at
least one revoked and migrate task needs committing back to client 2)
+ // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
+ // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
+ // crash case: (client 1 (beta) will commit all four tasks even
only two are migrate back to client 2)
+ // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C +
10 rec + A + 10 rec + C + 5 rec ---> C
+ // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C +
10 rec + A + 10 rec + C + 5 rec ---> C
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C +
10 rec + C + 4 rec + A + 5 rec ---> C
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C +
10 rec + C + 5 rec + A + 5 rec ---> C
commitRequested.set(0);
stateTransitions1.clear();
stateTransitions2.clear();
@@ -774,15 +769,18 @@ public class EosBetaUpgradeIntegrationTest {
waitForRunning(stateTransitions1);
waitForRunning(stateTransitions2);
- committedKeys.addAll(mkSet(0L, 1L, 2L, 3L));
+ newlyCommittedKeys.clear();
if (!injectError) {
- committedKeys.removeAll(keyFilterSecondClient);
+ newlyCommittedKeys.addAll(keysFromInstance(streams2Beta));
+ newlyCommittedKeys.removeAll(keysSecondClientAlphaTwo);
+ } else {
+ newlyCommittedKeys.addAll(mkSet(0L, 1L, 2L, 3L));
}
final List<KeyValue<Long, Long>>
expectedCommittedResultAfterRestartSecondClient = computeExpectedResult(
uncommittedInputDataBeforeSecondUpgrade
.stream()
- .filter(pair -> committedKeys.contains(pair.key))
+ .filter(pair -> newlyCommittedKeys.contains(pair.key))
.collect(Collectors.toList()),
committedState
);
@@ -792,24 +790,35 @@ public class EosBetaUpgradeIntegrationTest {
// expected end state per output partition (C == COMMIT; A ==
ABORT; ---> indicate the changes):
//
// stop case:
- // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5
rec + C
- // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5
rec + C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5
rec + C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5
rec + C
+ // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
---> 5 rec + C
+ // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
---> 5 rec + C
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
---> 5 rec + C
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
---> 5 rec + C
// crash case: (we just assumes that we inject the error for p-2;
in reality it might be a different partition)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec
+ A + 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec
+ A + 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec
+ C + 4 rec + A + 5 rec + C ---> 5 rec + C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec
+ C + 5 rec + A + 5 rec + C ---> 5 rec + C
+ // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C +
10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
+ // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C +
10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
+ // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C +
10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
+ // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C +
10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
commitCounterClient1.set(-1);
commitCounterClient2.set(-1);
- final List<KeyValue<Long, Long>> committedInputDataAfterUpgrade =
+ final List<KeyValue<Long, Long>> finishLastBatch =
prepareData(35L, 40L, 0L, 1L, 2L, 3L);
- writeInputData(committedInputDataAfterUpgrade);
+ writeInputData(finishLastBatch);
+
+ final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
+ uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
+ uncommittedKeys.removeAll(newlyCommittedKeys);
+ final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade =
uncommittedInputDataBeforeSecondUpgrade
+ .stream()
+ .filter(pair -> uncommittedKeys.contains(pair.key))
+ .collect(Collectors.toList());
+ committedInputDataDuringUpgrade.addAll(
+ finishLastBatch
+ );
final List<KeyValue<Long, Long>> expectedCommittedResult =
- computeExpectedResult(committedInputDataAfterUpgrade,
committedState);
+ computeExpectedResult(committedInputDataDuringUpgrade,
committedState);
verifyCommitted(expectedCommittedResult);
} finally {
if (streams1Alpha != null) {
@@ -854,7 +863,6 @@ public class EosBetaUpgradeIntegrationTest {
KeyValueStore<Long, Long> state = null;
AtomicBoolean crash;
AtomicInteger sharedCommit;
- Cancellable punctuator;
@Override
public void init(final ProcessorContext context) {
@@ -868,11 +876,6 @@ public class EosBetaUpgradeIntegrationTest {
crash = errorInjectedClient2;
sharedCommit = commitCounterClient2;
}
- punctuator = context.schedule(
- Duration.ofMillis(100),
- PunctuationType.WALL_CLOCK_TIME,
- new CommitPunctuator(context, requestCommit)
- );
}
@Override
@@ -905,9 +908,7 @@ public class EosBetaUpgradeIntegrationTest {
}
@Override
- public void close() {
- punctuator.cancel();
- }
+ public void close() {}
};
} }, storeNames)
.to(MULTI_PARTITION_OUTPUT_TOPIC);
@@ -916,11 +917,12 @@ public class EosBetaUpgradeIntegrationTest {
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
processingGuarantee);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
Long.MAX_VALUE);
-
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
"1000");
+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
Duration.ofSeconds(1L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
-
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
5 * 1000);
-
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
5 * 1000 - 1);
+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
(int) Duration.ofSeconds(5L).toMillis());
+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
(int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
MAX_POLL_INTERVAL_MS);
+
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
(int) Duration.ofMinutes(1L).toMillis());
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG),
KeyPartitioner.class);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath() + File.separator + appDir);
@@ -1020,7 +1022,8 @@ public class EosBetaUpgradeIntegrationTest {
)
),
MULTI_PARTITION_OUTPUT_TOPIC,
- numberOfRecords
+ numberOfRecords,
+ MAX_WAIT_TIME_MS
);
}
@@ -1039,7 +1042,15 @@ public class EosBetaUpgradeIntegrationTest {
addAllKeys(allKeys, expectedResult);
for (final Long key : allKeys) {
- assertThat(getAllRecordPerKey(key, result),
equalTo(getAllRecordPerKey(key, expectedResult)));
+ try {
+ assertThat(getAllRecordPerKey(key, result),
equalTo(getAllRecordPerKey(key, expectedResult)));
+ } catch (final AssertionError error) {
+ throw new AssertionError(
+ "expected result: " +
expectedResult.stream().map(KeyValue::toString).reduce("", (kv, str) ->
str.isEmpty() ? kv : str + ", " + kv) +
+ "\nreceived records: " +
result.stream().map(KeyValue::toString).reduce("", (kv, str) -> str.isEmpty() ?
kv : str + ", " + kv),
+ error
+ );
+ }
}
}
@@ -1075,19 +1086,27 @@ public class EosBetaUpgradeIntegrationTest {
}
private Set<Long> keysFromInstance(final KafkaStreams streams) throws
Exception {
- final ReadOnlyKeyValueStore<Long, Long> store = getStore(
+ final Set<Long> keys = new HashSet<>();
+ waitForCondition(
+ () -> {
+ final ReadOnlyKeyValueStore<Long, Long> store = streams.store(
+ StoreQueryParameters.fromNameAndType(storeName,
QueryableStoreTypes.keyValueStore())
+ );
+
+ keys.clear();
+ try (final KeyValueIterator<Long, Long> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<Long, Long> row = it.next();
+ keys.add(row.key);
+ }
+ }
+
+ return true;
+ },
MAX_WAIT_TIME_MS,
- streams,
- StoreQueryParameters.fromNameAndType(storeName,
QueryableStoreTypes.keyValueStore())
+ "Could not get keys from store: " + storeName
);
- waitForCondition(() -> store.get(-1L) == null, MAX_WAIT_TIME_MS, () ->
"State store did not ready: " + storeName);
- final Set<Long> keys = new HashSet<>();
- try (final KeyValueIterator<Long, Long> it = store.all()) {
- while (it.hasNext()) {
- final KeyValue<Long, Long> row = it.next();
- keys.add(row.key);
- }
- }
+
return keys;
}
@@ -1133,7 +1152,7 @@ public class EosBetaUpgradeIntegrationTest {
}
@Override
- public void commitTransaction() throws ProducerFencedException {
+ public void commitTransaction() {
super.flush(); // we flush to ensure that the offsets are written
if (!crash.compareAndSet(true, false)) {
super.commitTransaction();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 03b287d..3de6a7d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1158,7 +1158,7 @@ public class IntegrationTestUtils {
}
private static boolean continueConsuming(final int messagesConsumed, final
int maxMessages) {
- return maxMessages <= 0 || messagesConsumed < maxMessages;
+ return maxMessages > 0 && messagesConsumed < maxMessages;
}
/**