Repository: kafka
Updated Branches:
  refs/heads/trunk b760615f3 -> 005b86ecf


MINOR: Add random aborts to system test transactional copier service

Author: Jason Gustafson <[email protected]>

Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]>

Closes #3340 from hachikuji/add-random-aborts-to-system-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/005b86ec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/005b86ec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/005b86ec

Branch: refs/heads/trunk
Commit: 005b86ecf3bd881cb2005c177c2555eeb2a049d9
Parents: b760615
Author: Jason Gustafson <[email protected]>
Authored: Wed Jun 14 16:20:18 2017 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Wed Jun 14 16:20:18 2017 -0700

----------------------------------------------------------------------
 .../services/transactional_message_copier.py    |  7 ++-
 tests/kafkatest/tests/core/transactions_test.py |  4 +-
 .../kafka/tools/TransactionalMessageCopier.java | 52 +++++++++++++-------
 3 files changed, 43 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/005b86ec/tests/kafkatest/services/transactional_message_copier.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/transactional_message_copier.py 
b/tests/kafkatest/services/transactional_message_copier.py
index 01d124b..7e96a2c 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -47,7 +47,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, 
BackgroundThreadService
 
     def __init__(self, context, num_nodes, kafka, transactional_id, 
consumer_group,
                  input_topic, input_partition, output_topic, max_messages = -1,
-                 transaction_size = 1000):
+                 transaction_size = 1000, enable_random_aborts=True):
         super(TransactionalMessageCopier, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.transactional_id = transactional_id
@@ -61,6 +61,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, 
BackgroundThreadService
         self.consumed = -1
         self.remaining = -1
         self.stop_timeout_sec = 60
+        self.enable_random_aborts = enable_random_aborts
         self.loggers = {
             "org.apache.kafka.clients.producer.internals": "TRACE"
         }
@@ -117,6 +118,10 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, 
BackgroundThreadService
         cmd += " --output-topic %s" % self.output_topic
         cmd += " --input-partition %s" % str(self.input_partition)
         cmd += " --transaction-size %s" % str(self.transaction_size)
+
+        if self.enable_random_aborts:
+            cmd += " --enable-random-aborts"
+
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
         cmd += " 2>> %s | tee -a %s &" % 
(TransactionalMessageCopier.STDERR_CAPTURE, 
TransactionalMessageCopier.STDOUT_CAPTURE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/005b86ec/tests/kafkatest/tests/core/transactions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/transactions_test.py 
b/tests/kafkatest/tests/core/transactions_test.py
index c284bb6..9ccb259 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -206,9 +206,9 @@ class TransactionsTest(Test):
 
         for copier in copiers:
             wait_until(lambda: copier.is_done,
-                       timeout_sec=60,
+                       timeout_sec=120,
                        err_msg="%s - Failed to copy all messages in  %ds." %\
-                       (copier.transactional_id, 60))
+                       (copier.transactional_id, 120))
         self.logger.info("finished copying messages")
         return self.drain_consumer(concurrent_consumer)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/005b86ec/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index d748ce4..3903a3a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -31,29 +31,34 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static java.util.Collections.singleton;
 import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
 /**
  * This class is primarily meant for use with system tests. It copies messages 
from an input partition to an output
  * topic transactionally, committing the offsets and messages together.
  */
 public class TransactionalMessageCopier {
-     /** Get the command-line argument parser. */
+
+    /** Get the command-line argument parser. */
     private static ArgumentParser argParser() {
         ArgumentParser parser = ArgumentParsers
                 .newArgumentParser("transactional-message-copier")
                 .defaultHelp(true)
-                .description("This tool copies messages transactionally from 
an input partition to an output topic, committing the consumed offsets along 
with the output messages");
+                .description("This tool copies messages transactionally from 
an input partition to an output topic, " +
+                        "committing the consumed offsets along with the output 
messages");
 
         parser.addArgument("--input-topic")
                 .action(store())
@@ -71,7 +76,6 @@ public class TransactionalMessageCopier {
                 .dest("inputPartition")
                 .help("Consume messages from this partition of the input 
topic.");
 
-
         parser.addArgument("--output-topic")
                 .action(store())
                 .required(true)
@@ -116,7 +120,6 @@ public class TransactionalMessageCopier {
                 .dest("messagesPerTransaction")
                 .help("The number of messages to put in each transaction. 
Default is 200.");
 
-
         parser.addArgument("--transactional-id")
                 .action(store())
                 .required(true)
@@ -125,6 +128,12 @@ public class TransactionalMessageCopier {
                 .dest("transactionalId")
                 .help("The transactionalId to assign to the producer");
 
+        parser.addArgument("--enable-random-aborts")
+                .action(storeTrue())
+                .type(Boolean.class)
+                .metavar("ENABLE-RANDOM-ABORTS")
+                .dest("enableRandomAborts")
+                .help("Whether or not to enable random transaction aborts (for 
system testing)");
 
         return parser;
     }
@@ -144,7 +153,7 @@ public class TransactionalMessageCopier {
         return new KafkaProducer<>(props);
     }
 
-    private static KafkaConsumer<String, String> createConsumer(Namespace 
parsedArgs, TopicPartition inputPartition) {
+    private static KafkaConsumer<String, String> createConsumer(Namespace 
parsedArgs) {
         String consumerGroup = parsedArgs.getString("consumerGroup");
         String brokerList = parsedArgs.getString("brokerList");
         Integer numMessagesPerTransaction = 
parsedArgs.getInt("messagesPerTransaction");
@@ -164,10 +173,7 @@ public class TransactionalMessageCopier {
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringDeserializer");
 
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
-        return consumer;
-
+        return new KafkaConsumer<>(props);
     }
 
     private static ProducerRecord<String, String> 
producerRecordFromConsumerRecord(String topic, ConsumerRecord<String, String> 
record) {
@@ -188,13 +194,13 @@ public class TransactionalMessageCopier {
             if (offsetAndMetadata != null)
                 consumer.seek(topicPartition, offsetAndMetadata.offset());
             else
-                
consumer.seekToBeginning(Collections.singletonList(topicPartition));
+                consumer.seekToBeginning(singleton(topicPartition));
         }
     }
 
     private static long messagesRemaining(KafkaConsumer<String, String> 
consumer, TopicPartition partition) {
         long currentPosition = consumer.position(partition);
-        Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(Arrays.asList(partition));
+        Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(singleton(partition));
         if (endOffsets.containsKey(partition)) {
             return endOffsets.get(partition) - currentPosition;
         }
@@ -238,12 +244,14 @@ public class TransactionalMessageCopier {
         TopicPartition inputPartition = new 
TopicPartition(parsedArgs.getString("inputTopic"), 
parsedArgs.getInt("inputPartition"));
 
         final KafkaProducer<String, String> producer = 
createProducer(parsedArgs);
-        final KafkaConsumer<String, String> consumer = 
createConsumer(parsedArgs, inputPartition);
+        final KafkaConsumer<String, String> consumer = 
createConsumer(parsedArgs);
 
-        consumer.assign(Arrays.asList(inputPartition));
+        consumer.assign(singleton(inputPartition));
 
         long maxMessages = parsedArgs.getInt("maxMessages") == -1 ? 
Long.MAX_VALUE : parsedArgs.getInt("maxMessages");
         maxMessages = Math.min(messagesRemaining(consumer, inputPartition), 
maxMessages);
+        final boolean enableRandomAborts = 
parsedArgs.getBoolean("enableRandomAborts");
+
 
         producer.initTransactions();
 
@@ -264,12 +272,14 @@ public class TransactionalMessageCopier {
         });
 
         try {
+            Random random = new Random();
             while (0 < remainingMessages.get()) {
                 System.out.println(statusAsJson(numMessagesProcessed.get(), 
remainingMessages.get(), transactionalId));
                 if (isShuttingDown.get())
                     break;
                 int messagesInCurrentTransaction = 0;
                 long numMessagesForNextTransaction = 
Math.min(numMessagesPerTransaction, remainingMessages.get());
+
                 try {
                     producer.beginTransaction();
                     while (messagesInCurrentTransaction < 
numMessagesForNextTransaction) {
@@ -280,8 +290,16 @@ public class TransactionalMessageCopier {
                         }
                     }
                     
producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup);
-                    producer.commitTransaction();
-                    remainingMessages.set(maxMessages - 
numMessagesProcessed.addAndGet(messagesInCurrentTransaction));
+
+                    if (enableRandomAborts && random.nextInt() % 3 == 0) {
+                        throw new KafkaException("Aborting transaction");
+                    } else {
+                        producer.commitTransaction();
+                        remainingMessages.set(maxMessages - 
numMessagesProcessed.addAndGet(messagesInCurrentTransaction));
+                    }
+                } catch (ProducerFencedException | OutOfOrderSequenceException 
e) {
+                    // We cannot recover from these errors, so just rethrow 
them and let the process fail
+                    throw e;
                 } catch (KafkaException e) {
                     producer.abortTransaction();
                     resetToLastCommittedPositions(consumer);

Reply via email to