Rumel created KAFKA-9547:
----------------------------

             Summary: Kafka transaction - skip one offset when the application 
stops and be started again
                 Key: KAFKA-9547
                 URL: https://issues.apache.org/jira/browse/KAFKA-9547
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.4.0
         Environment: I am using kafka-clients 2.4.0 and 
wurstmeister/kafka:2.12-2.3.0
            Reporter: Rumel


To be fair, I have tested it with normal kafka without transaction scheme, and 
it does not skip the offset when I try to rerun the ProducerTest like a lot of 
times.
{code:java}
object ProducerTest extends LazyLogging {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka.local:9092")
    props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
    props.put("acks", "all")
    props.put("retries", "3")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("zxc", "key", "value")
    val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
    val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
    producer.send(record)
    producer.send(record2)
    producer.send(record3)
    Thread.sleep(3000)
  }
}{code}
But when I enable transaction on producer, it will skip one offset when the 
ProducerTestWithTransaction application is rerun. Like when I first started it, 
it has an offset of 0,1,2 then after rerun, it will be 4,5,6 which skips 3, and 
so on and so forth.
{code:java}
object ProducerTestWithTransaction extends LazyLogging {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka.local:9092")
    props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
    props.put("enable.idempotence", "true")
    props.put("transactional.id", "alona")
    props.put("acks", "all")
    props.put("retries", "3")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("wew", "key", "value")
    val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
    val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
    producer.initTransactions()
    try {
      producer.beginTransaction()
      producer.send(record)
      producer.send(record2)
      producer.send(record3)
      producer.commitTransaction()
    } catch {
      case e: ProducerFencedException => producer.close()
      case e: Exception => producer.abortTransaction();
    }
  }
}{code}
Please enlighten me why this is happening? Is this the standard behavior when 
we are using transaction? Is there any workaround on this to not skip an 
offset. Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to