Alex Fechner created KAFKA-4597: ----------------------------------- Summary: Record metadata returned by producer does'n consider log append time Key: KAFKA-4597 URL: https://issues.apache.org/jira/browse/KAFKA-4597 Project: Kafka Issue Type: Bug Components: clients, producer Affects Versions: 0.10.1.1 Reporter: Alex Fechner Priority: Minor
Kafka topics might be configured recording timestamps of the messages produced. There are two different timestamps which might be stored: # Record *create time*: The time the record is created by the client. # Log *append time*: The time the record has been added to the log by the broker. The [ProducerRecord|https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html] docs state: {quote} In either of the cases above, the timestamp that has actually been used will be returned to user in RecordMetadata {quote} However I found the *create time* used in both cases. The following class creates two topics, one configured to store *create time*. The other used *log append time*. It produces 10 messages in each topic and outputs the timestamps from the record meta data as well as those fetched by a consumer client. {code:java} import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaTimestampTest { public static void main(String[] args) throws ExecutionException, InterruptedException { String ip = "10.0.10.8"; Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers", ip + ":9092"); producerProperties.put("acks", "all"); producerProperties.put("retries", 0); producerProperties.put("batch.size", 16384); producerProperties.put("linger.ms", 1); producerProperties.put("buffer.memory", 33554432); producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", ip + ":9092"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("session.timeout.ms", "30000"); consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Producer<String, String> producer = new KafkaProducer<>(producerProperties); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, ZKStringSerializer$.MODULE$); ZkConnection zkConnection = new ZkConnection(ip + ":2181"); ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); TopicPartition topicPartitionWithCreateTime = new TopicPartition("test-topic-with-create-time", 0); TopicPartition topicPartitionWithLogAppendTime = new TopicPartition("test-topic-with-log-append-time", 0); // create topic with create time if (!AdminUtils.topicExists(zkUtils, topicPartitionWithCreateTime.topic())) { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "CreateTime"); AdminUtils.createTopic(zkUtils, topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$); } // create topic with log append time if (!AdminUtils.topicExists(zkUtils, topicPartitionWithLogAppendTime.topic())) { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "LogAppendTime"); AdminUtils.createTopic(zkUtils, topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$); } consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, topicPartitionWithCreateTime)); String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, ConsumerRecordOffset: %s, ConsumerRecordTime: %s"; System.out.println(String.format("Create messages into topic %s ...", topicPartitionWithCreateTime)); for (int i = 0; i < 10; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithCreateTime.topic(), topicPartitionWithCreateTime.partition(), "", "message")).get(); consumer.seek(topicPartitionWithCreateTime, recordMetadata.offset()); ConsumerRecord<String, String> consumerRecord = consumer.poll(1000).records(topicPartitionWithCreateTime).get(0); System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp())); } System.out.println(String.format("Create messages into topic %s...", topicPartitionWithLogAppendTime)); for (int i = 0; i < 10; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), topicPartitionWithLogAppendTime.partition(), "", "message")).get(); consumer.seek(topicPartitionWithLogAppendTime, recordMetadata.offset()); ConsumerRecord<String, String> consumerRecord = consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0); System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp())); } AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic()); AdminUtils.deleteTopic(zkUtils, topicPartitionWithLogAppendTime.topic()); } } {code} The output shows that in case of *log append time* the timestamps differ. {code} Create messages into topic test-topic-with-create-time-0 ... #1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623773788 #2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774178 #3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623774183 #4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623774188 #5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623774193 #6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623774197 #7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623774202 #8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623774207 #9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623774212 #10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623774217 Create messages into topic test-topic-with-log-append-time-0... #1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623774992 #2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774997 #3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623775002 #4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623775007 #5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623775011 #6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623775015 #7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623775020 #8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623775024 #9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623775029 #10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623775034 {code} I assume the timestamps in the record meta data represent the create time but could not ensure that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)