[
https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gwen Shapira updated KAFKA-1898:
--------------------------------
Fix Version/s: (was: 0.8.3)
> compatibility testing framework
> --------------------------------
>
> Key: KAFKA-1898
> URL: https://issues.apache.org/jira/browse/KAFKA-1898
> Project: Kafka
> Issue Type: Bug
> Reporter: Joe Stein
> Attachments: cctk.png
>
>
> There are a few different scenarios where you want/need to know the
> status/state of a client library that works with Kafka. Client library
> development is not just about supporting the wire protocol but also the
> implementations around specific interactions of the API. The API has
> blossomed into a robust set of producer, consumer, broker and administrative
> calls all of which have layers of logic above them. A Client Library may
> choose to deviate from the path the project sets out and that is ok. The goal
> of this ticket is to have a system for Kafka that can help to explain what
> the library is or isn't doing (regardless of what it claims).
> The idea behind this stems in being able to quickly/easily/succinctly analyze
> the topic message data. Once you can analyze the topic(s) message you can
> gather lots of information about what the client library is doing, is not
> doing and such. There are a few components to this.
> 1) dataset-generator
> Test Kafka dataset generation tool. Generates a random text file with given
> params:
> --filename, -f - output file name.
> --filesize, -s - desired size of output file. The actual size will always be
> a bit larger (with a maximum size of $filesize + $max.length - 1)
> --min.length, -l - minimum generated entry length.
> --max.length, -h - maximum generated entry length.
> Usage:
> ./gradlew build
> java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 100000 -l 2
> -h 20
> 2) dataset-producer
> Test Kafka dataset producer tool. Able to produce the given dataset to Kafka
> or Syslog server. The idea here is you already have lots of data sets that
> you want to test different things for. You might have different sized
> messages, formats, etc and want a repeatable benchmark to run and re-run the
> testing on. You could just have a days worth of data and just choose to
> replay it. The CCTK idea is that you are always starting from CONSUME in
> your state of library. If your library is only producing then you will fail a
> bunch of tests and that might be ok for people.
> Accepts following params:
> {code}
> --filename, -f - input file name.
> --kafka, -k - Kafka broker address in host:port format. If this parameter is
> set, --producer.config and --topic must be set too (otherwise they're
> ignored).
> --producer.config, -p - Kafka producer properties file location.
> --topic, -t - Kafka topic to produce to.
> --syslog, -s - Syslog server address. Format: protocol://host:port
> (tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example)
> --loop, -l - flag to loop through file until shut off manually. False by
> default.
> Usage:
> ./gradlew build
> java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename
> dataset --syslog tcp://0.0.0.0:5140 --loop true
> {code}
> 3) extract
> This step is good so you can save data and compare tests. It could also be
> removed if folks are just looking for a real live test (and we could support
> that too). Here we are taking data out of Kafka and putting it into
> Cassandra (but other data stores can be used too and we should come up with a
> way to abstract this out completely so folks could implement whatever they
> wanted.
> {code}
> package ly.stealth.shaihulud.reader
> import java.util.UUID
> import com.datastax.spark.connector._
> import com.datastax.spark.connector.cql.CassandraConnector
> import consumer.kafka.MessageAndMetadata
> import consumer.kafka.client.KafkaReceiver
> import org.apache.spark._
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.dstream.DStream
> object Main extends App with Logging {
> val parser = new scopt.OptionParser[ReaderConfiguration]("spark-reader") {
> head("Spark Reader for Kafka client applications", "1.0")
> opt[String]("testId") unbounded() optional() action { (x, c) =>
> c.copy(testId = x)
> } text ("Source topic with initial set of data")
> opt[String]("source") unbounded() required() action { (x, c) =>
> c.copy(sourceTopic = x)
> } text ("Source topic with initial set of data")
> opt[String]("destination") unbounded() required() action { (x, c) =>
> c.copy(destinationTopic = x)
> } text ("Destination topic with processed set of data")
> opt[Int]("partitions") unbounded() optional() action { (x, c) =>
> c.copy(partitions = x)
> } text ("Partitions in topic")
> opt[String]("zookeeper") unbounded() required() action { (x, c) =>
> c.copy(zookeeper = x)
> } text ("Zookeeper connection host:port")
> opt[Int]("kafka.fetch.size") unbounded() optional() action { (x, c) =>
> c.copy(kafkaFetchSize = x)
> } text ("Maximum KBs to fetch from Kafka")
> checkConfig { c =>
> if (c.testId.isEmpty || c.sourceTopic.isEmpty ||
> c.destinationTopic.isEmpty || c.zookeeper.isEmpty) {
> failure("You haven't provided all required parameters")
> } else {
> success
> }
> }
> }
> val config = parser.parse(args, ReaderConfiguration()) match {
> case Some(c) => c
> case None => sys.exit(1)
> }
> val sparkConfig = new SparkConf().setAppName("kafka_client_validator")
> .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> val ssc = new StreamingContext(sparkConfig, Seconds(10))
> ssc.checkpoint("reader")
> CassandraConnector(sparkConfig).withSessionDo( session => {
> session.execute("CREATE KEYSPACE IF NOT EXISTS kafka_client_validation
> WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
> session.execute("CREATE TABLE IF NOT EXISTS
> kafka_client_validation.tests(test_id text PRIMARY KEY, source_topic text,
> destination_topic text)")
> session.execute("CREATE TABLE IF NOT EXISTS
> kafka_client_validation.counters(test_id text, topic text, total counter,
> PRIMARY KEY(test_id, topic))")
> session.execute("CREATE TABLE IF NOT EXISTS
> kafka_client_validation.messages(test_id text, topic text, partition int,
> offset int, payload text, PRIMARY KEY(test_id, topic, partition, offset))")
> })
> val test = Test(config.testId, config.sourceTopic, config.destinationTopic)
>
> ssc.sparkContext.parallelize(Seq(test)).saveToCassandra("kafka_client_validation",
> "tests")
> startStreamForTopic(test.test_id, config.sourceTopic, config)
> startStreamForTopic(test.test_id, config.destinationTopic, config)
> ssc.start()
> ssc.awaitTermination()
> def startStreamForTopic(testId: String, topic: String, config:
> ReaderConfiguration) {
> val stream = createKafkaStream(config.zookeeper, topic,
> config.partitions).repartition(config.partitions).persist(StorageLevel.MEMORY_AND_DISK_SER)
> stream.map(message => {
> Counter(testId, message.getTopic, 1L)
> }).reduce((prev, curr) => {
> Counter(testId, prev.topic, prev.total + curr.total)
> }).foreachRDD(rdd => {
> rdd.saveToCassandra("kafka_client_validation", "counters")
> })
> stream.map(message => {
> Message(testId, message.getTopic,message.getPartition.partition,
> message.getOffset, new String(message.getPayload))
> }).foreachRDD(rdd => {
> rdd.saveToCassandra("kafka_client_validation", "messages")
> })
> }
> private def createKafkaStream(zkConnect: String, topic: String, partitions:
> Int): DStream[MessageAndMetadata] = {
> val zkhosts = zkConnect.split(":")(0)
> val zkports = zkConnect.split(":")(1)
> val kafkaParams = Map("zookeeper.hosts" -> zkhosts,
> "zookeeper.port" -> zkports,
> "zookeeper.consumer.connection" -> zkConnect,
> "zookeeper.broker.path" -> "/brokers",
> "zookeeper.consumer.path" -> "/consumers",
> "fetch.size.bytes" -> (config.kafkaFetchSize *
> 1024).toString,
> "kafka.topic" -> topic,
> "kafka.consumer.id" -> "%s-%s".format(topic,
> UUID.randomUUID().toString))
> val props = new java.util.Properties()
> kafkaParams foreach { case (key, value) => props.put(key, value)}
> val streams = (0 to partitions - 1).map { partitionId =>
> ssc.receiverStream(new KafkaReceiver(StorageLevel.MEMORY_AND_DISK_SER, props,
> partitionId))}
> ssc.union(streams)
> }
> }
> case class Test(test_id: String = "", source_topic: String = "",
> destination_topic: String = "")
> case class Counter(test_id: String = "", topic: String = "", total: Long = 0L)
> case class Message(test_id: String = "", topic: String = "", partition: Int =
> 0, offset: Long = 0, payload: String = "")
> case class ReaderConfiguration(testId: String = UUID.randomUUID().toString,
> sourceTopic: String = "", destinationTopic: String = "",
> partitions: Int = 1, zookeeper: String =
> "", kafkaFetchSize: Int = 8)
> {code}
> 4) validator
> This is plug-able both for how to read the topics and process the results to
> once done
> Right now we have been checking out using Spark and Cassandra for this, we
> also are looking at Spark and HBase and Samza with the Mesos support. The
> nice thing about using Samza is we really don't have to use another data
> store it is just so easy to put the results back into a topic.
> Here is kind of what the Spark/Cassandra version looks like for whether or
> not a consumer/producer is a) at least once processing guarantee 2) order
> order preserving 3) etc, etc, etc. While this test is running many (as much
> as you want) negative testing can be done to the cluster. It is made to run
> in an environment where you want to pump through as much data as you can as
> fast as you can and then once done, analyze it.
> {code}
> package ly.stealth.shaihulud.validator
> import java.security.MessageDigest
> import java.util.Iterator
> import com.datastax.driver.core.{Cluster, Row, SocketOptions}
> object Main extends App {
> val parser = new
> scopt.OptionParser[ValidatorConfiguration]("spark-validator") {
> head("Spark Validator for Kafka client applications", "1.0")
> opt[String]("test.id") unbounded() required() action { (x, c) =>
> c.copy(testId = x)
> } text ("Test ID")
> opt[String]("cassandra.connect") unbounded() required() action { (x, c) =>
> c.copy(cassandraConnect = x)
> } text ("Cassandra host")
> opt[String]("cassandra.user") unbounded() required() action { (x, c) =>
> c.copy(cassandraUser = x)
> } text ("Cassandra user")
> opt[String]("cassandra.password") unbounded() required() action { (x, c)
> =>
> c.copy(cassandraPassword = x)
> } text ("Cassandra password")
> checkConfig { c =>
> if (c.testId.isEmpty || c.cassandraConnect.isEmpty ||
> c.cassandraUser.isEmpty || c.cassandraPassword.isEmpty) {
> failure("You haven't provided all required parameters")
> } else {
> success
> }
> }
> }
> val config = parser.parse(args, ValidatorConfiguration()) match {
> case Some(c) => c
> case None => sys.exit(1)
> }
> val cluster = new Cluster.Builder()
> .addContactPoints(config.cassandraConnect)
> .withSocketOptions(new SocketOptions().setTcpNoDelay(true))
> .build()
> val session = cluster.connect("kafka_client_validation")
> val tests = session.execute("SELECT * FROM kafka_client_validation.tests
> WHERE test_id='%s'".format(config.testId))
> val test = tests.one()
> if (test != null) {
> val testId = test.getString("test_id")
> val sourceTopic = test.getString("source_topic")
> val destinationTopic = test.getString("destination_topic")
> val countersQuery = "SELECT * FROM kafka_client_validation.counters WHERE
> test_id='%s' AND topic='%s'"
> val sourceCounter = session.execute(countersQuery.format(testId,
> sourceTopic))
> val destinationCounter = session.execute(countersQuery.format(testId,
> destinationTopic))
> println("***** TEST RESULTS *****")
> var sameAmount = false
> val totalInSource = sourceCounter.one().getLong("total")
> val totalInDestination = destinationCounter.one().getLong("total")
> if (totalInSource == totalInDestination) {
> sameAmount = true
> }
> println(" - Destination topic contains the same amount of messages as
> Source topic(%d out of %d): %B".format(totalInSource,
>
> totalInDestination,
>
> sameAmount))
> val messagesQuery = "SELECT * FROM kafka_client_validation.messages WHERE
> test_id='%s' AND topic='%s'"
> val sourceMessages = session.execute(messagesQuery.format(testId,
> sourceTopic))
> val destinationMessages = session.execute(messagesQuery.format(testId,
> destinationTopic))
> val si = sourceMessages.iterator()
> val di = destinationMessages.iterator()
> val portionSize = 1000
> var isOrderPreserved = true
> while ((si.hasNext || di.hasNext) && isOrderPreserved) {
> val sourceHash = this.calculateMD5ForSlice(si, portionSize)
> val destinationHash = this.calculateMD5ForSlice(di, portionSize)
> if (sourceHash != destinationHash) {
> isOrderPreserved = false
> }
> }
> println(" - Destination topic preserves ordering of Source topic:
> %B".format(isOrderPreserved))
> } else {
> System.err.println("There is no such test '%s'".format(config.testId))
> }
> cluster.close()
> def calculateMD5ForSlice(it: Iterator[Row], portionSize: Int): String = {
> val sb = new StringBuilder
> var left = portionSize
> while (it.hasNext && left > 0) {
> sb.append(it.next.getString("payload"))
> left = left - 1
> }
> new
> String(MessageDigest.getInstance("MD5").digest(sb.toString().getBytes("UTF-8")))
> }
> }
> case class ValidatorConfiguration(testId: String = "", cassandraConnect:
> String = "", cassandraUser: String = "", cassandraPassword: String = "")
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)