I have cluster of 3 kafka brokers. With the following script I send some
data to kafka and in the middle do the controlled shutdown of 1 broker. All
3 brokers are ISR before I start sending. When i shutdown the broker i get
a couple of exceptions and I expect data shouldn't be written. Say, I send
1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
instead i always consume more, i.e. 1480 or 1490. I want to decide if I
want to retry sending myself, not using message.send.max.retries. But looks
like if I retry sending if there is an exception - I will end up with
duplicates. Is there anything I'm doing wrong or having wrong assumptions
about kafka?

Thanks.

val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
10.80.42.156:9092")
var count = 0
for(line <- Source.fromFile(file).getLines()){
    try {
      prod.send("benchmark", buffer.toList)
      count += 1
      println("sent %s", count)
    } catch {
      case _ => println("Exception!")
    }
}

class MyProducer(brokerList: String) {
  val sync = true
  val requestRequiredAcks = "-1"

  val props = new Properties()
  props.put("metadata.broker.list", brokerList)
  props.put("producer.type", if(sync) "sync" else "async")
  props.put("request.required.acks", requestRequiredAcks)
  props.put("key.serializer.class", classOf[StringEncoder].getName)
  props.put("serializer.class", classOf[StringEncoder].getName)
  props.put("message.send.max.retries", "0")
  props.put("request.timeout.ms", "2000")

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

  def send(topic: String, messages: List[String]) = {
    val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
    for (message <- messages) {
      requests += new KeyedMessage(topic, null, message, message)
    }
    producer.send(requests)
  }
}

Reply via email to