[ 
https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1898:
--------------------------------
    Fix Version/s:     (was: 0.8.3)

> compatibility testing framework 
> --------------------------------
>
>                 Key: KAFKA-1898
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1898
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joe Stein
>         Attachments: cctk.png
>
>
> There are a few different scenarios where you want/need to know the 
> status/state of a client library that works with Kafka. Client library 
> development is not just about supporting the wire protocol but also the 
> implementations around specific interactions of the API.  The API has 
> blossomed into a robust set of producer, consumer, broker and administrative 
> calls all of which have layers of logic above them.  A Client Library may 
> choose to deviate from the path the project sets out and that is ok. The goal 
> of this ticket is to have a system for Kafka that can help to explain what 
> the library is or isn't doing (regardless of what it claims).
> The idea behind this stems in being able to quickly/easily/succinctly analyze 
> the topic message data. Once you can analyze the topic(s) message you can 
> gather lots of information about what the client library is doing, is not 
> doing and such.  There are a few components to this.
> 1) dataset-generator 
> Test Kafka dataset generation tool. Generates a random text file with given 
> params:
> --filename, -f - output file name.
> --filesize, -s - desired size of output file. The actual size will always be 
> a bit larger (with a maximum size of $filesize + $max.length - 1)
> --min.length, -l - minimum generated entry length.
> --max.length, -h - maximum generated entry length.
> Usage:
> ./gradlew build
> java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 100000 -l 2 
> -h 20
> 2) dataset-producer
> Test Kafka dataset producer tool. Able to produce the given dataset to Kafka 
> or Syslog server.  The idea here is you already have lots of data sets that 
> you want to test different things for. You might have different sized 
> messages, formats, etc and want a repeatable benchmark to run and re-run the 
> testing on. You could just have a days worth of data and just choose to 
> replay it.  The CCTK idea is that you are always starting from CONSUME in 
> your state of library. If your library is only producing then you will fail a 
> bunch of tests and that might be ok for people.
> Accepts following params:
> {code}
> --filename, -f - input file name.
> --kafka, -k - Kafka broker address in host:port format. If this parameter is 
> set, --producer.config and --topic must be set too (otherwise they're 
> ignored).
> --producer.config, -p - Kafka producer properties file location.
> --topic, -t - Kafka topic to produce to.
> --syslog, -s - Syslog server address. Format: protocol://host:port 
> (tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example)
> --loop, -l - flag to loop through file until shut off manually. False by 
> default.
> Usage:
> ./gradlew build
> java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename 
> dataset --syslog tcp://0.0.0.0:5140 --loop true
> {code}
> 3) extract
> This step is good so you can save data and compare tests. It could also be 
> removed if folks are just looking for a real live test (and we could support 
> that too).  Here we are taking data out of Kafka and putting it into 
> Cassandra (but other data stores can be used too and we should come up with a 
> way to abstract this out completely so folks could implement whatever they 
> wanted.
> {code}
> package ly.stealth.shaihulud.reader
> import java.util.UUID
> import com.datastax.spark.connector._
> import com.datastax.spark.connector.cql.CassandraConnector
> import consumer.kafka.MessageAndMetadata
> import consumer.kafka.client.KafkaReceiver
> import org.apache.spark._
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.dstream.DStream
> object Main extends App with Logging {
>   val parser = new scopt.OptionParser[ReaderConfiguration]("spark-reader") {
>     head("Spark Reader for Kafka client applications", "1.0")
>     opt[String]("testId") unbounded() optional() action { (x, c) =>
>       c.copy(testId = x)
>     } text ("Source topic with initial set of data")
>     opt[String]("source") unbounded() required() action { (x, c) =>
>       c.copy(sourceTopic = x)
>     } text ("Source topic with initial set of data")
>     opt[String]("destination") unbounded() required() action { (x, c) =>
>       c.copy(destinationTopic = x)
>     } text ("Destination topic with processed set of data")
>     opt[Int]("partitions") unbounded() optional() action { (x, c) =>
>       c.copy(partitions = x)
>     } text ("Partitions in topic")
>     opt[String]("zookeeper") unbounded() required() action { (x, c) =>
>       c.copy(zookeeper = x)
>     } text ("Zookeeper connection host:port")
>     opt[Int]("kafka.fetch.size") unbounded() optional() action { (x, c) =>
>       c.copy(kafkaFetchSize = x)
>     } text ("Maximum KBs to fetch from Kafka")
>     checkConfig { c =>
>       if (c.testId.isEmpty || c.sourceTopic.isEmpty || 
> c.destinationTopic.isEmpty || c.zookeeper.isEmpty) {
>         failure("You haven't provided all required parameters")
>       } else {
>         success
>       }
>                 }
>   }
>   val config = parser.parse(args, ReaderConfiguration()) match {
>     case Some(c) => c
>     case None => sys.exit(1)
>   }
>   val sparkConfig = new SparkConf().setAppName("kafka_client_validator")
>                                    .set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
>   val ssc = new StreamingContext(sparkConfig, Seconds(10))
>   ssc.checkpoint("reader")
>   CassandraConnector(sparkConfig).withSessionDo( session => {
>     session.execute("CREATE KEYSPACE IF NOT EXISTS kafka_client_validation 
> WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
>     session.execute("CREATE TABLE IF NOT EXISTS 
> kafka_client_validation.tests(test_id text PRIMARY KEY, source_topic text, 
> destination_topic text)")
>     session.execute("CREATE TABLE IF NOT EXISTS 
> kafka_client_validation.counters(test_id text, topic text, total counter, 
> PRIMARY KEY(test_id, topic))")
>     session.execute("CREATE TABLE IF NOT EXISTS 
> kafka_client_validation.messages(test_id text, topic text, partition int, 
> offset int, payload text, PRIMARY KEY(test_id, topic, partition, offset))")
>   })
>   val test = Test(config.testId, config.sourceTopic, config.destinationTopic)
>   
> ssc.sparkContext.parallelize(Seq(test)).saveToCassandra("kafka_client_validation",
>  "tests")
>   startStreamForTopic(test.test_id, config.sourceTopic, config)
>   startStreamForTopic(test.test_id, config.destinationTopic, config)
>   ssc.start()
>   ssc.awaitTermination()
>   def startStreamForTopic(testId: String, topic: String, config: 
> ReaderConfiguration) {
>     val stream = createKafkaStream(config.zookeeper, topic, 
> config.partitions).repartition(config.partitions).persist(StorageLevel.MEMORY_AND_DISK_SER)
>     stream.map(message => {
>       Counter(testId, message.getTopic, 1L)
>     }).reduce((prev, curr) => {
>       Counter(testId, prev.topic, prev.total + curr.total)
>     }).foreachRDD(rdd => {
>       rdd.saveToCassandra("kafka_client_validation", "counters")
>     })
>     stream.map(message => {
>       Message(testId, message.getTopic,message.getPartition.partition, 
> message.getOffset, new String(message.getPayload))
>     }).foreachRDD(rdd => {
>       rdd.saveToCassandra("kafka_client_validation", "messages")
>     })
>   }
>   private def createKafkaStream(zkConnect: String, topic: String, partitions: 
> Int): DStream[MessageAndMetadata] = {
>     val zkhosts = zkConnect.split(":")(0)
>     val zkports = zkConnect.split(":")(1)
>     val kafkaParams = Map("zookeeper.hosts" -> zkhosts,
>                           "zookeeper.port" -> zkports,
>                           "zookeeper.consumer.connection" -> zkConnect,
>                           "zookeeper.broker.path" -> "/brokers",
>                           "zookeeper.consumer.path" -> "/consumers",
>                           "fetch.size.bytes" -> (config.kafkaFetchSize * 
> 1024).toString,
>                           "kafka.topic" -> topic,
>                           "kafka.consumer.id" -> "%s-%s".format(topic, 
> UUID.randomUUID().toString))
>     val props = new java.util.Properties()
>     kafkaParams foreach { case (key, value) => props.put(key, value)}
>     val streams = (0 to partitions - 1).map { partitionId => 
> ssc.receiverStream(new KafkaReceiver(StorageLevel.MEMORY_AND_DISK_SER, props, 
> partitionId))}
>     ssc.union(streams)
>   }
> }
> case class Test(test_id: String = "", source_topic: String = "", 
> destination_topic: String = "")
> case class Counter(test_id: String = "", topic: String = "", total: Long = 0L)
> case class Message(test_id: String = "", topic: String = "", partition: Int = 
> 0, offset: Long = 0, payload: String = "")
> case class ReaderConfiguration(testId: String = UUID.randomUUID().toString, 
> sourceTopic: String = "", destinationTopic: String = "",
>                                   partitions: Int = 1, zookeeper: String = 
> "", kafkaFetchSize: Int = 8)
> {code}
> 4) validator
> This is plug-able both for how to read the topics and process the results to 
> once done
> Right now we have been checking out using Spark and Cassandra for this, we 
> also are looking at Spark and HBase and Samza with the Mesos support.  The 
> nice thing about using Samza is we really don't have to use another data 
> store it is just so easy to put the results back into a topic.
> Here is kind of what the Spark/Cassandra version looks like for whether or 
> not a consumer/producer is a) at least once processing guarantee 2) order 
> order preserving 3) etc, etc, etc. While this test is running many (as much 
> as you want) negative testing can be done to the cluster. It is made to run 
> in an environment where you want to pump through as much data as you can as 
> fast as you can and then once done, analyze it.
> {code}
> package ly.stealth.shaihulud.validator
> import java.security.MessageDigest
> import java.util.Iterator
> import com.datastax.driver.core.{Cluster, Row, SocketOptions}
> object Main extends App {
>   val parser = new 
> scopt.OptionParser[ValidatorConfiguration]("spark-validator") {
>     head("Spark Validator for Kafka client applications", "1.0")
>     opt[String]("test.id") unbounded() required() action { (x, c) =>
>       c.copy(testId = x)
>     } text ("Test ID")
>     opt[String]("cassandra.connect") unbounded() required() action { (x, c) =>
>       c.copy(cassandraConnect = x)
>     } text ("Cassandra host")
>     opt[String]("cassandra.user") unbounded() required() action { (x, c) =>
>       c.copy(cassandraUser = x)
>     } text ("Cassandra user")
>     opt[String]("cassandra.password") unbounded() required() action { (x, c) 
> =>
>       c.copy(cassandraPassword = x)
>     } text ("Cassandra password")
>     checkConfig { c =>
>       if (c.testId.isEmpty || c.cassandraConnect.isEmpty || 
> c.cassandraUser.isEmpty || c.cassandraPassword.isEmpty) {
>         failure("You haven't provided all required parameters")
>       } else {
>         success
>       }
>                 }
>   }
>   val config = parser.parse(args, ValidatorConfiguration()) match {
>     case Some(c) => c
>     case None => sys.exit(1)
>   }
>   val cluster = new Cluster.Builder()
>     .addContactPoints(config.cassandraConnect)
>     .withSocketOptions(new SocketOptions().setTcpNoDelay(true))
>     .build()
>   val session = cluster.connect("kafka_client_validation")
>   val tests = session.execute("SELECT * FROM kafka_client_validation.tests 
> WHERE test_id='%s'".format(config.testId))
>   val test = tests.one()
>   if (test != null) {
>     val testId = test.getString("test_id")
>     val sourceTopic = test.getString("source_topic")
>     val destinationTopic = test.getString("destination_topic")
>     val countersQuery = "SELECT * FROM kafka_client_validation.counters WHERE 
> test_id='%s' AND topic='%s'"
>     val sourceCounter = session.execute(countersQuery.format(testId, 
> sourceTopic))
>     val destinationCounter = session.execute(countersQuery.format(testId, 
> destinationTopic))
>     println("***** TEST RESULTS *****")
>     var sameAmount = false
>     val totalInSource = sourceCounter.one().getLong("total")
>     val totalInDestination = destinationCounter.one().getLong("total")
>     if (totalInSource == totalInDestination) {
>       sameAmount = true
>     }
>     println(" - Destination topic contains the same amount of messages as 
> Source topic(%d out of %d): %B".format(totalInSource,
>                                                                               
>                                    totalInDestination,
>                                                                               
>                                    sameAmount))
>     val messagesQuery = "SELECT * FROM kafka_client_validation.messages WHERE 
> test_id='%s' AND topic='%s'"
>     val sourceMessages = session.execute(messagesQuery.format(testId, 
> sourceTopic))
>     val destinationMessages = session.execute(messagesQuery.format(testId, 
> destinationTopic))
>     val si = sourceMessages.iterator()
>     val di = destinationMessages.iterator()
>     val portionSize = 1000
>     var isOrderPreserved = true
>     while ((si.hasNext || di.hasNext) && isOrderPreserved) {
>       val sourceHash = this.calculateMD5ForSlice(si, portionSize)
>       val destinationHash = this.calculateMD5ForSlice(di, portionSize)
>       if (sourceHash != destinationHash) {
>         isOrderPreserved = false
>       }
>     }
>     println(" - Destination topic preserves ordering of Source topic: 
> %B".format(isOrderPreserved))
>   } else {
>     System.err.println("There is no such test '%s'".format(config.testId))
>   }
>   cluster.close()
>   def calculateMD5ForSlice(it: Iterator[Row], portionSize: Int): String = {
>     val sb = new StringBuilder
>     var left = portionSize
>     while (it.hasNext && left > 0) {
>       sb.append(it.next.getString("payload"))
>       left = left - 1
>     }
>     new 
> String(MessageDigest.getInstance("MD5").digest(sb.toString().getBytes("UTF-8")))
>   }
> }
> case class ValidatorConfiguration(testId: String = "", cassandraConnect: 
> String = "", cassandraUser: String = "", cassandraPassword: String = "")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to