Or, to rephrase it more generally, is there a way to know exactly if
message was committed or no?


On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <kane.ist...@gmail.com> wrote:

> Hello Guozhang,
>
> My partitions are split almost evenly between broker, so, yes - broker
> that I shutdown is the leader for some of them. Does it mean i can get an
> exception and data is still being written? Is there any setting on the
> broker where i can control this? I.e. can i make broker replication timeout
> shorter than producer timeout, so i can ensure if i get an exception data
> is not being committed?
>
> Thanks.
>
>
> On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangg...@gmail.com>wrote:
>
>> Hello Kane,
>>
>> As discussed in the other thread, even if a timeout response is sent back
>> to the producer, the message may still be committed.
>>
>> Did you shut down the leader broker of the partition or a follower broker?
>>
>> Guozhang
>>
>> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.ist...@gmail.com> wrote:
>>
>> > I have cluster of 3 kafka brokers. With the following script I send some
>> > data to kafka and in the middle do the controlled shutdown of 1 broker.
>> All
>> > 3 brokers are ISR before I start sending. When i shutdown the broker i
>> get
>> > a couple of exceptions and I expect data shouldn't be written. Say, I
>> send
>> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
>> > instead i always consume more, i.e. 1480 or 1490. I want to decide if I
>> > want to retry sending myself, not using message.send.max.retries. But
>> looks
>> > like if I retry sending if there is an exception - I will end up with
>> > duplicates. Is there anything I'm doing wrong or having wrong
>> assumptions
>> > about kafka?
>> >
>> > Thanks.
>> >
>> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
>> > 10.80.42.156:9092")
>> > var count = 0
>> > for(line <- Source.fromFile(file).getLines()){
>> >     try {
>> >       prod.send("benchmark", buffer.toList)
>> >       count += 1
>> >       println("sent %s", count)
>> >     } catch {
>> >       case _ => println("Exception!")
>> >     }
>> > }
>> >
>> > class MyProducer(brokerList: String) {
>> >   val sync = true
>> >   val requestRequiredAcks = "-1"
>> >
>> >   val props = new Properties()
>> >   props.put("metadata.broker.list", brokerList)
>> >   props.put("producer.type", if(sync) "sync" else "async")
>> >   props.put("request.required.acks", requestRequiredAcks)
>> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
>> >   props.put("serializer.class", classOf[StringEncoder].getName)
>> >   props.put("message.send.max.retries", "0")
>> >   props.put("request.timeout.ms", "2000")
>> >
>> >   val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
>> >
>> >   def send(topic: String, messages: List[String]) = {
>> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
>> >     for (message <- messages) {
>> >       requests += new KeyedMessage(topic, null, message, message)
>> >     }
>> >     producer.send(requests)
>> >   }
>> > }
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to