I would assume, that you refer to "commit markers". Each time you call
commitTransaction(), a special message called commit marker is written
to the log to indicate a successful transaction (there are also "abort
markers" if a transaction gets aborted).

Those markers "eat up" one offset, but wont' be delivered to the
application but are filtered out on read. Thus, using transaction, you
cannot infer from `endOffset - startOffset` of a partition how many
message are actually in the topic.

You can verify this by consuming the topic and inspecting the offsets of
returned messages -- commit/abort markers are skipped and you wont
receive message with consecutive offsets.

-Matthias

On 8/22/18 8:34 AM, jingguo yao wrote:
> I am sending some Kafka messages over the Internet. The message sizes
> are about 400K. The essential logic of my code is as follows:
> 
> Properties config = new Properties();
> config.put("bootstrap.servers", "...");
> config.put("client.id", "...");
> config.put("key.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> config.put("value.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> config.put("transactional.id", "...");
> 
> producer = new KafkaProducer<>(config());
> producer.initTransactions();
> 
> producer.beginTransaction();
> for (byte[] bytesMessage : batch) {
>   producer.send(
>       new ProducerRecord<>("...", null, bytesMessage));
> }
> producer.commitTransaction();
> 
> I found that there were two records for some bytesMessage on the
> topic. Is there something wrong with my code? Or duplicated message
> deliveries are still possible with transactional.id set.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to