
We are implementing exactly once in our application with help of 
KafakaConsumer's offsetsForTimes API.

Out kafka producer is using transactional semantics  as below : pseudo steps

It is enabled using :          producerProps.put("transactional.id", "h3");

producer = new KafkaProducer<String,String>(producerProps);

And transactions are committed using :

                1. producer.initTransactions();

                2. producer.beginTransaction();

                3. // push some data

                4. producer.commitTransaction();

                5. producer.beginTransaction();

                // Repeat 3rd, 4th and 5th statements in loop

                //when done

                6. producer.commitTransaction();

                7. producer.close();

Our observations are :

                I have tried combinations for transaction semantics 
enabled/disabled + CreateTime is current / 1 day old.

                1. Transaction disabled , CreateTime not provided (Will take 
system time) : Offset fetching is proper

                2. Transaction enabled , CreateTime not provided (Will take 
system time) : Offset fetching is proper

                3. Transaction disabled , CreateTime is 1 day old : Offset 
fetching is proper

                4. Transaction enabled , CreateTime is 1 day old : Offset 
return is seems like very first batch commit offset only.

                (Regarding above case #4 , we have tried to use properties 
message.timestamp.difference.max.ms and retention.ms also. But result is same. 
And our data is ranging from few hours to days but again result is same.)

Please help if we are missing something when we are using transaction semantics 
for populating kafka topic.

Reply via email to