Repository: kafka Updated Branches: refs/heads/trunk b760615f3 -> 005b86ecf
MINOR: Add random aborts to system test transactional copier service Author: Jason Gustafson <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]> Closes #3340 from hachikuji/add-random-aborts-to-system-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/005b86ec Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/005b86ec Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/005b86ec Branch: refs/heads/trunk Commit: 005b86ecf3bd881cb2005c177c2555eeb2a049d9 Parents: b760615 Author: Jason Gustafson <[email protected]> Authored: Wed Jun 14 16:20:18 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed Jun 14 16:20:18 2017 -0700 ---------------------------------------------------------------------- .../services/transactional_message_copier.py | 7 ++- tests/kafkatest/tests/core/transactions_test.py | 4 +- .../kafka/tools/TransactionalMessageCopier.java | 52 +++++++++++++------- 3 files changed, 43 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/005b86ec/tests/kafkatest/services/transactional_message_copier.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py index 01d124b..7e96a2c 100644 --- a/tests/kafkatest/services/transactional_message_copier.py +++ b/tests/kafkatest/services/transactional_message_copier.py @@ -47,7 +47,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group, input_topic, input_partition, output_topic, max_messages = -1, - transaction_size = 1000): + transaction_size = 1000, enable_random_aborts=True): super(TransactionalMessageCopier, self).__init__(context, num_nodes) self.kafka = kafka self.transactional_id = transactional_id @@ -61,6 +61,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService self.consumed = -1 self.remaining = -1 self.stop_timeout_sec = 60 + self.enable_random_aborts = enable_random_aborts self.loggers = { "org.apache.kafka.clients.producer.internals": "TRACE" } @@ -117,6 +118,10 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService cmd += " --output-topic %s" % self.output_topic cmd += " --input-partition %s" % str(self.input_partition) cmd += " --transaction-size %s" % str(self.transaction_size) + + if self.enable_random_aborts: + cmd += " --enable-random-aborts" + if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) cmd += " 2>> %s | tee -a %s &" % (TransactionalMessageCopier.STDERR_CAPTURE, TransactionalMessageCopier.STDOUT_CAPTURE) http://git-wip-us.apache.org/repos/asf/kafka/blob/005b86ec/tests/kafkatest/tests/core/transactions_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index c284bb6..9ccb259 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -206,9 +206,9 @@ class TransactionsTest(Test): for copier in copiers: wait_until(lambda: copier.is_done, - timeout_sec=60, + timeout_sec=120, err_msg="%s - Failed to copy all messages in %ds." %\ - (copier.transactional_id, 60)) + (copier.transactional_id, 120)) self.logger.info("finished copying messages") return self.drain_consumer(concurrent_consumer) http://git-wip-us.apache.org/repos/asf/kafka/blob/005b86ec/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index d748ce4..3903a3a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -31,29 +31,34 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import static java.util.Collections.singleton; import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; /** * This class is primarily meant for use with system tests. It copies messages from an input partition to an output * topic transactionally, committing the offsets and messages together. */ public class TransactionalMessageCopier { - /** Get the command-line argument parser. */ + + /** Get the command-line argument parser. */ private static ArgumentParser argParser() { ArgumentParser parser = ArgumentParsers .newArgumentParser("transactional-message-copier") .defaultHelp(true) - .description("This tool copies messages transactionally from an input partition to an output topic, committing the consumed offsets along with the output messages"); + .description("This tool copies messages transactionally from an input partition to an output topic, " + + "committing the consumed offsets along with the output messages"); parser.addArgument("--input-topic") .action(store()) @@ -71,7 +76,6 @@ public class TransactionalMessageCopier { .dest("inputPartition") .help("Consume messages from this partition of the input topic."); - parser.addArgument("--output-topic") .action(store()) .required(true) @@ -116,7 +120,6 @@ public class TransactionalMessageCopier { .dest("messagesPerTransaction") .help("The number of messages to put in each transaction. Default is 200."); - parser.addArgument("--transactional-id") .action(store()) .required(true) @@ -125,6 +128,12 @@ public class TransactionalMessageCopier { .dest("transactionalId") .help("The transactionalId to assign to the producer"); + parser.addArgument("--enable-random-aborts") + .action(storeTrue()) + .type(Boolean.class) + .metavar("ENABLE-RANDOM-ABORTS") + .dest("enableRandomAborts") + .help("Whether or not to enable random transaction aborts (for system testing)"); return parser; } @@ -144,7 +153,7 @@ public class TransactionalMessageCopier { return new KafkaProducer<>(props); } - private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs, TopicPartition inputPartition) { + private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs) { String consumerGroup = parsedArgs.getString("consumerGroup"); String brokerList = parsedArgs.getString("brokerList"); Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction"); @@ -164,10 +173,7 @@ public class TransactionalMessageCopier { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); - - return consumer; - + return new KafkaConsumer<>(props); } private static ProducerRecord<String, String> producerRecordFromConsumerRecord(String topic, ConsumerRecord<String, String> record) { @@ -188,13 +194,13 @@ public class TransactionalMessageCopier { if (offsetAndMetadata != null) consumer.seek(topicPartition, offsetAndMetadata.offset()); else - consumer.seekToBeginning(Collections.singletonList(topicPartition)); + consumer.seekToBeginning(singleton(topicPartition)); } } private static long messagesRemaining(KafkaConsumer<String, String> consumer, TopicPartition partition) { long currentPosition = consumer.position(partition); - Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Arrays.asList(partition)); + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(singleton(partition)); if (endOffsets.containsKey(partition)) { return endOffsets.get(partition) - currentPosition; } @@ -238,12 +244,14 @@ public class TransactionalMessageCopier { TopicPartition inputPartition = new TopicPartition(parsedArgs.getString("inputTopic"), parsedArgs.getInt("inputPartition")); final KafkaProducer<String, String> producer = createProducer(parsedArgs); - final KafkaConsumer<String, String> consumer = createConsumer(parsedArgs, inputPartition); + final KafkaConsumer<String, String> consumer = createConsumer(parsedArgs); - consumer.assign(Arrays.asList(inputPartition)); + consumer.assign(singleton(inputPartition)); long maxMessages = parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages"); maxMessages = Math.min(messagesRemaining(consumer, inputPartition), maxMessages); + final boolean enableRandomAborts = parsedArgs.getBoolean("enableRandomAborts"); + producer.initTransactions(); @@ -264,12 +272,14 @@ public class TransactionalMessageCopier { }); try { + Random random = new Random(); while (0 < remainingMessages.get()) { System.out.println(statusAsJson(numMessagesProcessed.get(), remainingMessages.get(), transactionalId)); if (isShuttingDown.get()) break; int messagesInCurrentTransaction = 0; long numMessagesForNextTransaction = Math.min(numMessagesPerTransaction, remainingMessages.get()); + try { producer.beginTransaction(); while (messagesInCurrentTransaction < numMessagesForNextTransaction) { @@ -280,8 +290,16 @@ public class TransactionalMessageCopier { } } producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup); - producer.commitTransaction(); - remainingMessages.set(maxMessages - numMessagesProcessed.addAndGet(messagesInCurrentTransaction)); + + if (enableRandomAborts && random.nextInt() % 3 == 0) { + throw new KafkaException("Aborting transaction"); + } else { + producer.commitTransaction(); + remainingMessages.set(maxMessages - numMessagesProcessed.addAndGet(messagesInCurrentTransaction)); + } + } catch (ProducerFencedException | OutOfOrderSequenceException e) { + // We cannot recover from these errors, so just rethrow them and let the process fail + throw e; } catch (KafkaException e) { producer.abortTransaction(); resetToLastCommittedPositions(consumer);
