mjsax commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r417738801



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class EosBetaUpgradeIntegrationTest {
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {
+            {false},
+            {true}
+        });
+    }
+
+    @Parameterized.Parameter
+    public boolean injectError;
+
+    private static final int NUM_BROKERS = 3;
+    private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
+    private static final int MAX_WAIT_TIME_MS = 60 * 1000;
+
+    private static final List<KeyValue<KafkaStreams.State, 
KafkaStreams.State>> TWO_REBALANCES_STARTUP =
+        Collections.unmodifiableList(
+            Arrays.asList(
+                KeyValue.pair(KafkaStreams.State.CREATED, 
KafkaStreams.State.REBALANCING),
+                KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING),
+                KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+                KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING)
+            )
+        );
+    private static final List<KeyValue<KafkaStreams.State, 
KafkaStreams.State>> TWO_REBALANCES_RUNNING =
+        Collections.unmodifiableList(
+            Arrays.asList(
+                KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+                KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING),
+                KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+                KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING)
+            )
+        );
+    private static final List<KeyValue<KafkaStreams.State, 
KafkaStreams.State>> SINGLE_REBALANCE =
+        Collections.unmodifiableList(
+            Arrays.asList(
+                KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.REBALANCING),
+                KeyValue.pair(KafkaStreams.State.REBALANCING, 
KafkaStreams.State.RUNNING)
+            )
+        );
+    private static final List<KeyValue<KafkaStreams.State, 
KafkaStreams.State>> CLOSE =
+        Collections.unmodifiableList(
+            Arrays.asList(
+                KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.PENDING_SHUTDOWN),
+                KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, 
KafkaStreams.State.NOT_RUNNING)
+            )
+        );
+    private static final List<KeyValue<KafkaStreams.State, 
KafkaStreams.State>> CRASH =
+        Collections.unmodifiableList(
+            Collections.singletonList(
+                KeyValue.pair(KafkaStreams.State.RUNNING, 
KafkaStreams.State.ERROR)
+            )
+        );
+    private static final List<KeyValue<KafkaStreams.State, 
KafkaStreams.State>> CLOSE_CRASHED =
+        Collections.unmodifiableList(
+            Arrays.asList(
+                KeyValue.pair(KafkaStreams.State.ERROR, 
KafkaStreams.State.PENDING_SHUTDOWN),
+                KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, 
KafkaStreams.State.NOT_RUNNING)
+            )
+        );
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
+        NUM_BROKERS,
+        
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"false"))
+    );
+
+    private static String applicationId;
+    private final static int NUM_TOPIC_PARTITIONS = 4;
+    private final static String CONSUMER_GROUP_ID = "readCommitted";
+    private final static String MULTI_PARTITION_INPUT_TOPIC = 
"multiPartitionInputTopic";
+    private final static String MULTI_PARTITION_OUTPUT_TOPIC = 
"multiPartitionOutputTopic";
+    private final String storeName = "store";
+
+    private final AtomicBoolean errorInjectedClient1 = new 
AtomicBoolean(false);
+    private final AtomicBoolean errorInjectedClient2 = new 
AtomicBoolean(false);
+    private final AtomicBoolean commitErrorInjectedClient1 = new 
AtomicBoolean(false);
+    private final AtomicBoolean commitErrorInjectedClient2 = new 
AtomicBoolean(false);
+    private final AtomicInteger commitCounterClient1 = new AtomicInteger(-1);
+    private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
+    private final AtomicInteger commitRequested = new AtomicInteger(0);
+
+    private Throwable uncaughtException;
+
+    private int testNumber = 0;
+
+    @Before
+    public void createTopics() throws Exception {
+        applicationId = "appId-" + ++testNumber;
+        CLUSTER.deleteTopicsAndWait(
+            MULTI_PARTITION_INPUT_TOPIC,
+            MULTI_PARTITION_OUTPUT_TOPIC,
+            applicationId + "-" + storeName + "-changelog"
+        );
+
+        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 
1);
+        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, 
NUM_TOPIC_PARTITIONS, 1);
+    }
+
+    @Test
+    public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
+        // We use two KafkaStreams clients that we upgrade from eos-alpha to 
eos-beta. During the upgrade,
+        // we ensure that there are pending transaction and verify that data 
is processed correctly.
+        //
+        // We either close clients cleanly (`injectError = false`) or let them 
crash (`injectError = true`) during
+        // the upgrade. For both cases, EOS should not be violated.
+        //
+        // Additionally, we inject errors while one client is on eos-alpha 
while the other client is on eos-beta:
+        // For this case, we inject the error during task commit phase, i.e., 
after offsets are appended to a TX,
+        // and before the TX is committed. The goal is to verify that the 
written but uncommitted offsets are not
+        // picked up, i.e., GroupCoordinator fencing works correctly.
+        //
+        // The commit interval is set to MAX_VALUE and the used `Processor` 
request commits manually so we have full
+        // control when a commit actually happens. We use an input topic with 
4 partitions and each task will request
+        // a commit after processing 10 records.
+        //
+        // 1.  start both clients and wait until rebalance stabilizes
+        // 2.  write 10 records per input topic partition and verify that the 
result was committed
+        // 3.  write 5 records per input topic partition to get pending 
transactions (verified via "read_uncommitted" mode)
+        //      - all 4 pending transactions are based on task producers
+        //      - we will get only 4 pending writes for one partition for the 
crash case as we crash processing the 5th record
+        // 4.  stop/crash the first client, wait until rebalance stabilizes:
+        //      - stop case:
+        //        * verify that the stopped client did commit its pending 
transaction during shutdown
+        //        * the second client will still have two pending transaction
+        //      - crash case:
+        //        * the pending transactions of the crashed client got aborted
+        //        * the second client will have four pending transactions
+        // 5.  restart the first client with eos-beta enabled and wait until 
rebalance stabilizes
+        //       - the rebalance should result in a commit of all tasks
+        // 6.  write 5 record per input topic partition
+        //       - stop case:
+        //         * verify that the result was committed
+        //       - crash case:
+        //         * fail the second (i.e., eos-alpha) client during commit
+        //         * the eos-beta client should not pickup the pending offsets
+        //         * verify uncommitted and committed result
+        // 7.  only for crash case:
+        //     7a. restart the second client in eos-alpha mode and wait until 
rebalance stabilizes
+        //     7b. write 10 records per input topic partition
+        //         * fail the first (i.e., eos-beta) client during commit
+        //         * the eos-alpha client should not pickup the pending offsets
+        //         * verify uncommitted and committed result
+        //     7c. restart the first client in eos-beta mode and wait until 
rebalance stabilizes
+        // 8.  write 5 records per input topic partition to get pending 
transactions (verified via "read_uncommitted" mode)
+        //      - 2 transaction are base on a task producer; one transaction 
is based on a thread producer
+        //      - we will get 4 pending writes for the crash case as we crash 
processing the 5th record
+        // 9.  stop/crash the second client and wait until rebalance 
stabilizes:
+        //      - stop only:
+        //        * verify that the stopped client did commit its pending 
transaction during shutdown
+        //        * the first client will still have one pending transaction
+        //      - crash case:
+        //        * the pending transactions of the crashed client got aborted
+        //        * the first client will have one pending transactions
+        // 10. restart the second client with eos-beta enabled and wait until 
rebalance stabilizes
+        //       - the rebalance should result in a commit of all tasks
+        // 11. write 5 record per input topic partition and verify that the 
result was committed
+
+        final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> 
stateTransitions1 = new LinkedList<>();
+        KafkaStreams streams1Alpha = null;
+        KafkaStreams streams1Beta = null;
+        KafkaStreams streams1BetaTwo = null;
+
+        final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> 
stateTransitions2 = new LinkedList<>();
+        KafkaStreams streams2Alpha = null;
+        KafkaStreams streams2AlphaTwo = null;
+        KafkaStreams streams2Beta = null;
+//        streams2Beta = getKafkaStreams("appDir2", 
StreamsConfig.EXACTLY_ONCE_BETA);
+//        streams2Beta.setStateListener((newState, oldState) -> 
stateTransitions2.add(KeyValue.pair(oldState, newState)));
+
+        try {

Review comment:
       We use 6 clients now, to do some error injection in "mixed mode". To 
avoid JXM warnings, we cannot create all clients at the same time and thus 
cannot use try-with-resources...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to