I would assume, that you refer to "commit markers". Each time you call commitTransaction(), a special message called commit marker is written to the log to indicate a successful transaction (there are also "abort markers" if a transaction gets aborted).
Those markers "eat up" one offset, but wont' be delivered to the application but are filtered out on read. Thus, using transaction, you cannot infer from `endOffset - startOffset` of a partition how many message are actually in the topic. You can verify this by consuming the topic and inspecting the offsets of returned messages -- commit/abort markers are skipped and you wont receive message with consecutive offsets. -Matthias On 8/22/18 8:34 AM, jingguo yao wrote: > I am sending some Kafka messages over the Internet. The message sizes > are about 400K. The essential logic of my code is as follows: > > Properties config = new Properties(); > config.put("bootstrap.servers", "..."); > config.put("client.id", "..."); > config.put("key.serializer", > "org.apache.kafka.common.serialization.ByteArraySerializer"); > config.put("value.serializer", > "org.apache.kafka.common.serialization.ByteArraySerializer"); > config.put("transactional.id", "..."); > > producer = new KafkaProducer<>(config()); > producer.initTransactions(); > > producer.beginTransaction(); > for (byte[] bytesMessage : batch) { > producer.send( > new ProducerRecord<>("...", null, bytesMessage)); > } > producer.commitTransaction(); > > I found that there were two records for some bytesMessage on the > topic. Is there something wrong with my code? Or duplicated message > deliveries are still possible with transactional.id set. >
signature.asc
Description: OpenPGP digital signature